You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by vonnagy <iv...@vadio.com> on 2016/11/04 17:20:04 UTC

Instability issues with Spark 2.0.1 and Kafka 0.10

I am getting the issues using Spark 2.0.1 and Kafka 0.10. I have two jobs,
one that uses a Kafka stream and one that uses just the KafkaRDD. 

With the KafkaRDD, I continually get the "Failed to get records .. after
polling". I have adjusted the polling with
`spark.streaming.kafka.consumer.poll.ms` and the size of records with
Kafka's `max.poll.records`. Even when it gets records it is extremely slow. 

When working with multiple KafkaRDDs in parallel I get the dreaded
`ConcurrentModificationException`. The Spark logic is supposed to use a
CachedKafkaConsumer based on the topic and partition. This is supposed to
guarantee thread safety, but I continually get this error along with the
polling timeout. 

Has anyone else tried to use Spark 2 with Kafka 0.10 and had any success. At
this point it is completely useless in my experience. With Spark 1.6 and
Kafka 0.8.x, I never had these problems.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
Preferred locations are only advisory, you can still get tasks scheduled on
other executors.  You can try bumping up the size of the cache to see if
that is causing the issue you're seeing.

On Nov 13, 2016 12:47, "Ivan von Nagy" <iv...@vadio.com> wrote:

> As the code iterates through the parallel list, it is processing up to 8
> KafkaRDD at a time. Each has it's own unique topic and consumer group now.
> Every topic has 4 partitions, so technically there should never be more
> then 32 CachedKafkaConsumers. However, this seems to not be the case as we
> are using the default settings for cache size (16 initial -> 64 max) and
> PreferConsistent for the location strategy. I do notice the concurrent
> modification exception occurs when a cached consumer is being dropped out
> of the cache when it reaches the max, 64. After looking at the code, the
> KafkaRDDIterator will only close it's consumer if we are not caching (makes
> sense), but there is no other way to close/drop the consumer until it gets
> dropped from the cache. Perhaps there is an issue with resources here since
> RDDs don't inherently have any resource management support, like "I am done
> so cleanup now".
>
> Over the course of this job, it will probably process upwards of 100-150
> different channels so about 400-600 partitions. Does this mean we should
> bump the cache size that high even though only about 8 channels (32
> partitions) are being handled by the executors at any given time?
>
> Thanks,
>
> Ivan
>
> On Sat, Nov 12, 2016 at 1:25 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> You should not be getting consumer churn on executors at all, that's
>> the whole point of the cache.  How many partitions are you trying to
>> process per executor?
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-int
>> egration.html#locationstrategies
>>
>> gives instructions on the default size of the cache and how to increase
>> it.
>>
>> On Sat, Nov 12, 2016 at 2:15 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>> > Hi Sean,
>> >
>> > Thanks for responding. We have run our jobs with internal parallel
>> > processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and
>> did
>> > not encounter any of these issues until upgrading to Spark 2.0.1 and
>> Kafka
>> > 0.10 clients. If we process serially, then we sometimes get the errors,
>> but
>> > far less often. Also, if done sequentially it takes sometimes more the
>> 2x as
>> > long which is not an option for this particular job.
>> >
>> > I posted another example on Nov 10th which is the example below. We
>> > basically iterate through a list in parallel and sometimes the list
>> could be
>> > upwards of a hundred elements. The parallelism in Scala/Spark limits to
>> > about 8 at a time on our nodes. For performance reasons we process in
>> > parallel and we also separate each since each channel has their own
>> topic.
>> > We don't combine all into one KafkaRDD because that means we have to
>> process
>> > all or nothing if an error occurs. This way if a couple of channels
>> fail, we
>> > can re-run the job and it will only process those channels.
>> >
>> > This has just been perplexing since we had never encountered any errors
>> for
>> > well over a year using the prior versions. At this time, I am just
>> seeking
>> > any configuration options or code changes that we may be missing or
>> even at
>> > a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that
>> > surfaced these issues.
>> >
>> > We continue to use Spark 1.6 with the Kafka 0.8.x clients until this
>> can be
>> > figured out, however, it is a deal breaker for use to upgrade to Spark
>> 2.x
>> > with Kafka 0.10 clients. On a side note, we have not encountered any
>> issues
>> > with the Kafka Producers, this is simply with the KafkaRDD and its use
>> of
>> > CachedKafkaConsumer. Any help is much appreciated.
>> >
>> > Thanks,
>> >
>> > Ivan
>> >
>> > Example usage with KafkaRDD:
>> > val channels = Seq("channel1", "channel2")
>> >
>> > channels.toParArray.foreach { channel =>
>> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >
>> >   // Get offsets for the given topic and the consumer group
>> "$prefix-$topic"
>> >   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>> >
>> >   val ds = KafkaUtils.createRDD[K, V](context,
>> >         kafkaParams asJava,
>> >         offsetRanges,
>> >         PreferConsistent).toDS[V]
>> >
>> >   // Do some aggregations
>> >   ds.agg(...)
>> >   // Save the data
>> >   ds.write.mode(SaveMode.Append).parquet(somePath)
>> >   // Save offsets using a KafkaConsumer
>> >   consumer.commitSync(newOffsets.asJava)
>> >   consumer.close()
>> > }
>> >
>> > On Sat, Nov 12, 2016 at 11:46 AM, Sean McKibben <gr...@graphex.com>
>> wrote:
>> >>
>> >> How are you iterating through your RDDs in parallel? In the past (Spark
>> >> 1.5.2) when I've had actions being performed on multiple RDDs
>> concurrently
>> >> using futures, I've encountered some pretty bad behavior in Spark,
>> >> especially during job retries. Very difficult to explain things, like
>> >> records from one RDD leaking into a totally different (though shared
>> >> lineage) RDD during job retries. I'm not sure what documentation exists
>> >> around parallelizing on top of Spark's existing parallelization
>> approach,
>> >> but I would venture a guess that that could be the source of your
>> concurrent
>> >> access problems, and potentially other hidden issues. Have you tried a
>> >> version of your app which doesn't parallelize actions on RDDs, but
>> instead
>> >> serially processes each RDD? I'm sure it isn't ideal performance-wise,
>> but
>> >> it seems like a good choice for an A/B test.
>> >>
>> >> The poll.ms issue could very well be settings or capability of your
>> kafka
>> >> cluster. I think other (non-Spark) approaches may have less consumer
>> churn
>> >> and be less susceptible to things like GC pauses or coordination
>> latency. It
>> >> could also be that the number of consumers being simultaneously
>> created on
>> >> each executor causes a thundering herd problem during initial phases
>> (which
>> >> then causes job retries, which then causes more consumer churn, etc.).
>> >>
>> >> Sean
>> >>
>> >>
>> >>
>> >> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <iv...@vadio.com> wrote:
>> >>
>> >> The code was changed to use a unique group for each KafkaRDD that was
>> >> created (see Nov 10 post). There is no KafkaRDD being reused. The basic
>> >> logic (see Nov 10 post for example) is get a list of channels, iterate
>> >> through them in parallel, load a KafkaRDD using a given topic and a
>> consumer
>> >> group that is made from the topic (each RDD uses a different topic and
>> >> group), process the data and write to Parquet files.
>> >>
>> >> Per my Nov 10th post, we still get polling timeouts unless the poll.ms
>> is
>> >> set to something like 10 seconds. We also get concurrent modification
>> >> exceptions as well. I believe the key here is the processing of data in
>> >> parallel is where we encounter issues so we are looking for some
>> possible
>> >> answers surrounding this.
>> >>
>> >> Thanks,
>> >>
>> >> Ivan
>> >>
>> >>
>> >> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <co...@koeninger.org>
>> >> wrote:
>> >>>
>> >>> It is already documented that you must use a different group id,
>> which as
>> >>> far as I can tell you are still not doing.
>> >>>
>> >>>
>> >>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <
>> shixiong@databricks.com>
>> >>> wrote:
>> >>>>
>> >>>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>> >>>>
>> >>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <iv...@vadio.com>
>> wrote:
>> >>>>>
>> >>>>> Ok, I have split he KafkaRDD logic to each use their own group and
>> >>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on
>> the
>> >>>>> poll.ms ends up with a timeout and exception so I am still
>> perplexed on that
>> >>>>> one. The new error I am getting now is a
>> `ConcurrentModificationException`
>> >>>>> when Spark is trying to remove the CachedKafkaConsumer.
>> >>>>>
>> >>>>> java.util.ConcurrentModificationException: KafkaConsumer is not
>> safe
>> >>>>> for multi-threaded access
>> >>>>> at
>> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> >>>>> at
>> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>> onsumer.java:1361)
>> >>>>> at
>> >>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano
>> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> >>>>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.
>> java:299)
>> >>>>>
>> >>>>> Here is the basic logic:
>> >>>>>
>> >>>>> Using KafkaRDD - This takes a list of channels and processes them in
>> >>>>> parallel using the KafkaRDD directly. They each use a distinct
>> consumer
>> >>>>> group (s"$prefix-$topic"), and each has it's own topic and each
>> topic has 4
>> >>>>> partitions. We routinely get timeout errors when polling for data
>> when the
>> >>>>> poll.ms is less then 2 seconds. This occurs whether we process in
>> parallel.
>> >>>>>
>> >>>>> Example usage with KafkaRDD:
>> >>>>> val channels = Seq("channel1", "channel2")
>> >>>>>
>> >>>>> channels.toParArray.foreach { channel =>
>> >>>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >>>>>
>> >>>>>   // Get offsets for the given topic and the consumer group
>> >>>>> "$prefix-$topic"
>> >>>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>> >>>>>
>> >>>>>   val ds = KafkaUtils.createRDD[K, V](context,
>> >>>>>         kafkaParams asJava,
>> >>>>>         offsetRanges,
>> >>>>>         PreferConsistent).toDS[V]
>> >>>>>
>> >>>>>   // Do some aggregations
>> >>>>>   ds.agg(...)
>> >>>>>   // Save the data
>> >>>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>> >>>>>   // Save offsets using a KafkaConsumer
>> >>>>>   consumer.commitSync(newOffsets.asJava)
>> >>>>>   consumer.close()
>> >>>>> }
>> >>>>>
>> >>>>> I am not sure why the concurrent issue is there as I have tried to
>> >>>>> debug and also looked at the KafkaConsumer code as well, but
>> everything
>> >>>>> looks like it should not occur. The things to figure out is why
>> when running
>> >>>>> in parallel does this occur and also why the timeouts still occur.
>> >>>>>
>> >>>>> Thanks,
>> >>>>>
>> >>>>> Ivan
>> >>>>>
>> >>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <cody@koeninger.org
>> >
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> There definitely is Kafka documentation indicating that you should
>> use
>> >>>>>> a different consumer group for logically different subscribers,
>> this
>> >>>>>> is really basic to Kafka:
>> >>>>>>
>> >>>>>> http://kafka.apache.org/documentation#intro_consumers
>> >>>>>>
>> >>>>>> As for your comment that "commit async after each RDD, which is not
>> >>>>>> really viable also", how is it not viable?  Again, committing
>> offsets
>> >>>>>> to Kafka doesn't give you reliable delivery semantics unless your
>> >>>>>> downstream data store is idempotent.  If your downstream data
>> store is
>> >>>>>> idempotent, then it shouldn't matter to you when offset commits
>> >>>>>> happen, as long as they happen within a reasonable time after the
>> data
>> >>>>>> is written.
>> >>>>>>
>> >>>>>> Do you want to keep arguing with me, or follow my advice and
>> proceed
>> >>>>>> with debugging any remaining issues after you make the changes I
>> >>>>>> suggested?
>> >>>>>>
>> >>>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com>
>> wrote:
>> >>>>>> > With our stream version, we update the offsets for only the
>> >>>>>> > partition we
>> >>>>>> > operating on. We even break down the partition into smaller
>> batches
>> >>>>>> > and then
>> >>>>>> > update the offsets after each batch within the partition. With
>> Spark
>> >>>>>> > 1.6 and
>> >>>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this
>> is
>> >>>>>> > not
>> >>>>>> > necessarily a Spark issue since Kafka no longer allows you to
>> simply
>> >>>>>> > update
>> >>>>>> > the offsets for a given consumer group. You have to subscribe or
>> >>>>>> > assign
>> >>>>>> > partitions to even do so.
>> >>>>>> >
>> >>>>>> > As for storing the offsets in some other place like a DB, it
>> don't
>> >>>>>> > find this
>> >>>>>> > useful because you then can't use tools like Kafka Manager. In
>> order
>> >>>>>> > to do
>> >>>>>> > so you would have to store in a DB and the circle back and update
>> >>>>>> > Kafka
>> >>>>>> > afterwards. This means you have to keep two sources in sync
>> which is
>> >>>>>> > not
>> >>>>>> > really a good idea.
>> >>>>>> >
>> >>>>>> > It is a challenge in Spark to use the Kafka offsets since the
>> drive
>> >>>>>> > keeps
>> >>>>>> > subscribed to the topic(s) and consumer group, while the
>> executors
>> >>>>>> > prepend
>> >>>>>> > "spark-executor-" to the consumer group. The stream (driver) does
>> >>>>>> > allow you
>> >>>>>> > to commit async after each RDD, which is not really viable also.
>> I
>> >>>>>> > have not
>> >>>>>> > of implementing an Akka actor system on the driver and send it
>> >>>>>> > messages from
>> >>>>>> > the executor code to update the offsets, but then that is
>> >>>>>> > asynchronous as
>> >>>>>> > well so not really a good solution.
>> >>>>>> >
>> >>>>>> > I have no idea why Kafka made this change and also why in the
>> >>>>>> > parallel
>> >>>>>> > KafkaRDD application we would be advised to use different
>> consumer
>> >>>>>> > groups
>> >>>>>> > for each RDD. That seems strange to me that different consumer
>> >>>>>> > groups would
>> >>>>>> > be required or advised. There is no Kafka documentation that I
>> know
>> >>>>>> > if that
>> >>>>>> > states this. The biggest issue I see with the parallel KafkaRDD
>> is
>> >>>>>> > the
>> >>>>>> > timeouts. I have tried to set poll.ms to 30 seconds and still
>> get
>> >>>>>> > the issue.
>> >>>>>> > Something is not right here and just not seem right. As I
>> mentioned
>> >>>>>> > with the
>> >>>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never
>> saw
>> >>>>>> > this
>> >>>>>> > issue. We have been running the same basic logic for over a year
>> now
>> >>>>>> > without
>> >>>>>> > one hitch at all.
>> >>>>>> >
>> >>>>>> > Ivan
>> >>>>>> >
>> >>>>>> >
>> >>>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <
>> cody@koeninger.org>
>> >>>>>> > wrote:
>> >>>>>> >>
>> >>>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>> >>>>>> >> general, not just kafka) have been progressing on to the next
>> batch
>> >>>>>> >> after a given batch aborts for quite some time now.  Yet another
>> >>>>>> >> reason I put offsets in my database transactionally.  My jobs
>> throw
>> >>>>>> >> exceptions if the offset in the DB isn't what I expected it to
>> be.
>> >>>>>> >>
>> >>>>>> >>
>> >>>>>> >>
>> >>>>>> >>
>> >>>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <
>> graphex@graphex.com>
>> >>>>>> >> wrote:
>> >>>>>> >> > I've been encountering the same kinds of timeout issues as
>> Ivan,
>> >>>>>> >> > using
>> >>>>>> >> > the "Kafka Stream" approach that he is using, except I'm
>> storing
>> >>>>>> >> > my offsets
>> >>>>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>> >>>>>> >> > haven't yet
>> >>>>>> >> > implemented the KafkaRDD approach, and therefore don't have
>> the
>> >>>>>> >> > concurrency
>> >>>>>> >> > issues, but a very similar use case is coming up for me soon,
>> >>>>>> >> > it's just been
>> >>>>>> >> > backburnered until I can get streaming to be more reliable (I
>> >>>>>> >> > will
>> >>>>>> >> > definitely ensure unique group IDs when I do). Offset commits
>> are
>> >>>>>> >> > certainly
>> >>>>>> >> > more painful in Kafka 0.10, and that doesn't have anything to
>> do
>> >>>>>> >> > with Spark.
>> >>>>>> >> >
>> >>>>>> >> > While i may be able to alleviate the timeout by just
>> increasing
>> >>>>>> >> > it, I've
>> >>>>>> >> > noticed something else that is more worrying: When one task
>> fails
>> >>>>>> >> > 4 times in
>> >>>>>> >> > a row (i.e. "Failed to get records for _ after polling for
>> _"),
>> >>>>>> >> > Spark aborts
>> >>>>>> >> > the Stage and Job with "Job aborted due to stage failure:
>> Task _
>> >>>>>> >> > in stage _
>> >>>>>> >> > failed 4 times". That's fine, and it's the behavior I want,
>> but
>> >>>>>> >> > instead of
>> >>>>>> >> > stopping the Application there (as previous versions of Spark
>> >>>>>> >> > did) the next
>> >>>>>> >> > microbatch marches on and offsets are committed ahead of the
>> >>>>>> >> > failed
>> >>>>>> >> > microbatch. Suddenly my at-least-once app becomes more
>> >>>>>> >> > sometimes-at-least-once which is no good. In order for spark
>> to
>> >>>>>> >> > display that
>> >>>>>> >> > failure, I must be propagating the errors up to Spark, but the
>> >>>>>> >> > behavior of
>> >>>>>> >> > marching forward with the next microbatch seems to be new,
>> and a
>> >>>>>> >> > big
>> >>>>>> >> > potential for data loss in streaming applications.
>> >>>>>> >> >
>> >>>>>> >> > Am I perhaps missing a setting to stop the entire streaming
>> >>>>>> >> > application
>> >>>>>> >> > once spark.task.maxFailures is reached? Has anyone else seen
>> this
>> >>>>>> >> > behavior
>> >>>>>> >> > of a streaming application skipping over failed microbatches?
>> >>>>>> >> >
>> >>>>>> >> > Thanks,
>> >>>>>> >> > Sean
>> >>>>>> >> >
>> >>>>>> >> >
>> >>>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <
>> cody@koeninger.org>
>> >>>>>> >> >> wrote:
>> >>>>>> >> >>
>> >>>>>> >> >> So basically what I am saying is
>> >>>>>> >> >>
>> >>>>>> >> >> - increase poll.ms
>> >>>>>> >> >> - use a separate group id everywhere
>> >>>>>> >> >> - stop committing offsets under the covers
>> >>>>>> >> >>
>> >>>>>> >> >> That should eliminate all of those as possible causes, and
>> then
>> >>>>>> >> >> we can
>> >>>>>> >> >> see if there are still issues.
>> >>>>>> >> >>
>> >>>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> >>>>>> >> >> subscribe to a topic in order to update offsets, Kafka
>> does.  If
>> >>>>>> >> >> you
>> >>>>>> >> >> don't like the new Kafka consumer api, the existing 0.8
>> simple
>> >>>>>> >> >> consumer api should be usable with later brokers.  As long as
>> >>>>>> >> >> you
>> >>>>>> >> >> don't need SSL or dynamic subscriptions, and it meets your
>> >>>>>> >> >> needs, keep
>> >>>>>> >> >> using it.
>> >>>>>> >> >>
>> >>>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <
>> ivan@vadio.com>
>> >>>>>> >> >> wrote:
>> >>>>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but
>> >>>>>> >> >>> each RDD
>> >>>>>> >> >>> uses a
>> >>>>>> >> >>> single distinct topic. For example, the group would be
>> >>>>>> >> >>> something like
>> >>>>>> >> >>> "storage-group", and the topics would be "storage-channel1",
>> >>>>>> >> >>> and
>> >>>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is
>> started,
>> >>>>>> >> >>> assigned the
>> >>>>>> >> >>> partitions assigned, and then commit offsets are called
>> after
>> >>>>>> >> >>> the RDD
>> >>>>>> >> >>> is
>> >>>>>> >> >>> processed. This should not interfere with the consumer group
>> >>>>>> >> >>> used by
>> >>>>>> >> >>> the
>> >>>>>> >> >>> executors which would be "spark-executor-storage-group".
>> >>>>>> >> >>>
>> >>>>>> >> >>> In the streaming example there is a single topic
>> >>>>>> >> >>> ("client-events") and
>> >>>>>> >> >>> group
>> >>>>>> >> >>> ("processing-group"). A single stream is created and offsets
>> >>>>>> >> >>> are
>> >>>>>> >> >>> manually
>> >>>>>> >> >>> updated from the executor after each partition is handled.
>> This
>> >>>>>> >> >>> was a
>> >>>>>> >> >>> challenge since Spark now requires one to assign or
>> subscribe
>> >>>>>> >> >>> to a
>> >>>>>> >> >>> topic in
>> >>>>>> >> >>> order to even update the offsets. In 0.8.2.x you did not
>> have
>> >>>>>> >> >>> to worry
>> >>>>>> >> >>> about
>> >>>>>> >> >>> that. This approach limits your exposure to duplicate data
>> >>>>>> >> >>> since
>> >>>>>> >> >>> idempotent
>> >>>>>> >> >>> records are not entirely possible in our scenario. At least
>> >>>>>> >> >>> without a
>> >>>>>> >> >>> lot of
>> >>>>>> >> >>> re-running of logic to de-dup.
>> >>>>>> >> >>>
>> >>>>>> >> >>> Thanks,
>> >>>>>> >> >>>
>> >>>>>> >> >>> Ivan
>> >>>>>> >> >>>
>> >>>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger
>> >>>>>> >> >>> <co...@koeninger.org>
>> >>>>>> >> >>> wrote:
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> So just to be clear, the answers to my questions are
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> - you are not using different group ids, you're using the
>> same
>> >>>>>> >> >>>> group
>> >>>>>> >> >>>> id everywhere
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> - you are committing offsets manually
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> Right?
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
>> >>>>>> >> >>>> source,
>> >>>>>> >> >>>> tune poll.ms upwards even higher.
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> You must use different group ids for different rdds or
>> >>>>>> >> >>>> streams.
>> >>>>>> >> >>>> Kafka consumers won't behave the way you expect if they are
>> >>>>>> >> >>>> all in
>> >>>>>> >> >>>> the
>> >>>>>> >> >>>> same group id, and the consumer cache is keyed by group id.
>> >>>>>> >> >>>> Yes, the
>> >>>>>> >> >>>> executor will tack "spark-executor-" on to the beginning,
>> but
>> >>>>>> >> >>>> if you
>> >>>>>> >> >>>> give it the same base group id, it will be the same.  And
>> the
>> >>>>>> >> >>>> driver
>> >>>>>> >> >>>> will use the group id you gave it, unmodified.
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> Finally, I really can't help you if you're manually writing
>> >>>>>> >> >>>> your own
>> >>>>>> >> >>>> code to commit offsets directly to Kafka.  Trying to
>> minimize
>> >>>>>> >> >>>> duplicates that way doesn't really make sense, your system
>> >>>>>> >> >>>> must be
>> >>>>>> >> >>>> able to handle duplicates if you're using kafka as an
>> offsets
>> >>>>>> >> >>>> store,
>> >>>>>> >> >>>> it can't do transactional exactly once.
>> >>>>>> >> >>>>
>> >>>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com>
>> >>>>>> >> >>>> wrote:
>> >>>>>> >> >>>>> Here are some examples and details of the scenarios. The
>> >>>>>> >> >>>>> KafkaRDD is
>> >>>>>> >> >>>>> the
>> >>>>>> >> >>>>> most
>> >>>>>> >> >>>>> error prone to polling
>> >>>>>> >> >>>>> timeouts and concurrentm modification errors.
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and
>> >>>>>> >> >>>>> processes them
>> >>>>>> >> >>>>> in
>> >>>>>> >> >>>>> parallel using the KafkaRDD directly. they all use the
>> same
>> >>>>>> >> >>>>> consumer
>> >>>>>> >> >>>>> group
>> >>>>>> >> >>>>> ('storage-group'), but each has it's own topic and each
>> topic
>> >>>>>> >> >>>>> has 4
>> >>>>>> >> >>>>> partitions. We routinely get timeout errors when polling
>> for
>> >>>>>> >> >>>>> data.
>> >>>>>> >> >>>>> This
>> >>>>>> >> >>>>> occurs whether we process in parallel or sequentially.
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> *Spark Kafka setting:*
>> >>>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> *Kafka Consumer Params:*
>> >>>>>> >> >>>>> metric.reporters = []
>> >>>>>> >> >>>>> metadata.max.age.ms = 300000
>> >>>>>> >> >>>>> partition.assignment.strategy =
>> >>>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>> >>>>>> >> >>>>> reconnect.backoff.ms = 50
>> >>>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> >>>>>> >> >>>>> max.partition.fetch.bytes = 1048576
>> >>>>>> >> >>>>> bootstrap.servers = [somemachine:31000]
>> >>>>>> >> >>>>> ssl.keystore.type = JKS
>> >>>>>> >> >>>>> enable.auto.commit = false
>> >>>>>> >> >>>>> sasl.mechanism = GSSAPI
>> >>>>>> >> >>>>> interceptor.classes = null
>> >>>>>> >> >>>>> exclude.internal.topics = true
>> >>>>>> >> >>>>> ssl.truststore.password = null
>> >>>>>> >> >>>>> client.id =
>> >>>>>> >> >>>>> ssl.endpoint.identification.algorithm = null
>> >>>>>> >> >>>>> max.poll.records = 1000
>> >>>>>> >> >>>>> check.crcs = true
>> >>>>>> >> >>>>> request.timeout.ms = 40000
>> >>>>>> >> >>>>> heartbeat.interval.ms = 3000
>> >>>>>> >> >>>>> auto.commit.interval.ms = 5000
>> >>>>>> >> >>>>> receive.buffer.bytes = 65536
>> >>>>>> >> >>>>> ssl.truststore.type = JKS
>> >>>>>> >> >>>>> ssl.truststore.location = null
>> >>>>>> >> >>>>> ssl.keystore.password = null
>> >>>>>> >> >>>>> fetch.min.bytes = 1
>> >>>>>> >> >>>>> send.buffer.bytes = 131072
>> >>>>>> >> >>>>> value.deserializer = class
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> com.vadio.analytics.spark.stor
>> age.ClientEventJsonOptionDeserializer
>> >>>>>> >> >>>>> group.id = storage-group
>> >>>>>> >> >>>>> retry.backoff.ms = 100
>> >>>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >>>>>> >> >>>>> sasl.kerberos.service.name = null
>> >>>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>> >>>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>> >>>>>> >> >>>>> ssl.key.password = null
>> >>>>>> >> >>>>> fetch.max.wait.ms = 500
>> >>>>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>> >>>>>> >> >>>>> connections.max.idle.ms = 540000
>> >>>>>> >> >>>>> session.timeout.ms = 30000
>> >>>>>> >> >>>>> metrics.num.samples = 2
>> >>>>>> >> >>>>> key.deserializer = class
>> >>>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>> >>>>>> >> >>>>> ssl.protocol = TLS
>> >>>>>> >> >>>>> ssl.provider = null
>> >>>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >>>>>> >> >>>>> ssl.keystore.location = null
>> >>>>>> >> >>>>> ssl.cipher.suites = null
>> >>>>>> >> >>>>> security.protocol = PLAINTEXT
>> >>>>>> >> >>>>> ssl.keymanager.algorithm = SunX509
>> >>>>>> >> >>>>> metrics.sample.window.ms = 30000
>> >>>>>> >> >>>>> auto.offset.reset = earliest
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> *Example usage with KafkaRDD:*
>> >>>>>> >> >>>>> val channels = Seq("channel1", "channel2")
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> channels.toParArray.foreach { channel =>
>> >>>>>> >> >>>>>  val consumer = new KafkaConsumer[K,
>> V](kafkaParams.asJava)
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>  // Get offsets for the given topic and the consumer group
>> >>>>>> >> >>>>> 'storage-group'
>> >>>>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>> >>>>>> >> >>>>>        kafkaParams asJava,
>> >>>>>> >> >>>>>        offsetRanges,
>> >>>>>> >> >>>>>        PreferConsistent).toDS[V]
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>  // Do some aggregations
>> >>>>>> >> >>>>>  ds.agg(...)
>> >>>>>> >> >>>>>  // Save the data
>> >>>>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>> >>>>>> >> >>>>>  // Save offsets using a KafkaConsumer
>> >>>>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>> >>>>>> >> >>>>>  consumer.close()
>> >>>>>> >> >>>>> }
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> *Example usage with Kafka Stream:*
>> >>>>>> >> >>>>> This creates a stream and processes events in each
>> partition.
>> >>>>>> >> >>>>> At the
>> >>>>>> >> >>>>> end
>> >>>>>> >> >>>>> of
>> >>>>>> >> >>>>> processing for
>> >>>>>> >> >>>>> each partition, we updated the offsets for each partition.
>> >>>>>> >> >>>>> This is
>> >>>>>> >> >>>>> challenging to do, but is better
>> >>>>>> >> >>>>> then calling commitAysnc on the stream, because that
>> occurs
>> >>>>>> >> >>>>> after
>> >>>>>> >> >>>>> the
>> >>>>>> >> >>>>> /entire/ RDD has been
>> >>>>>> >> >>>>> processed. This method minimizes duplicates in an exactly
>> >>>>>> >> >>>>> once
>> >>>>>> >> >>>>> environment.
>> >>>>>> >> >>>>> Since the executors
>> >>>>>> >> >>>>> use their own custom group "spark-executor-processor-grou
>> p"
>> >>>>>> >> >>>>> and the
>> >>>>>> >> >>>>> commit
>> >>>>>> >> >>>>> is buried in private
>> >>>>>> >> >>>>> functions we are unable to use the executors cached
>> consumer
>> >>>>>> >> >>>>> to
>> >>>>>> >> >>>>> update
>> >>>>>> >> >>>>> the
>> >>>>>> >> >>>>> offsets. This requires us
>> >>>>>> >> >>>>> to go through multiple steps to update the Kafka offsets
>> >>>>>> >> >>>>> accordingly.
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> val offsetRanges = getOffsets("processor-group",
>> "my-topic")
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>> >>>>>> >> >>>>>      PreferConsistent,
>> >>>>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>> >>>>>> >> >>>>>        kafkaParams,
>> >>>>>> >> >>>>>        offsetRanges))
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> stream.foreachRDD { rdd =>
>> >>>>>> >> >>>>>    val offsetRanges =
>> >>>>>> >> >>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>    // Transform our data
>> >>>>>> >> >>>>>   rdd.foreachPartition { events =>
>> >>>>>> >> >>>>>       // Establish a consumer in the executor so we can
>> >>>>>> >> >>>>> update
>> >>>>>> >> >>>>> offsets
>> >>>>>> >> >>>>> after each partition.
>> >>>>>> >> >>>>>       // This class is homegrown and uses the
>> KafkaConsumer
>> >>>>>> >> >>>>> to help
>> >>>>>> >> >>>>> get/set
>> >>>>>> >> >>>>> offsets
>> >>>>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>> >>>>>> >> >>>>>       // do something with our data
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>       // Write the offsets that were updated in this
>> >>>>>> >> >>>>> partition
>> >>>>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>> >>>>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>> >>>>>> >> >>>>> endOffset))
>> >>>>>> >> >>>>>   }
>> >>>>>> >> >>>>> }
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> --
>> >>>>>> >> >>>>> View this message in context:
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-
>> and-Kafka-0-10-tp28017p28020.html
>> >>>>>> >> >>>>> Sent from the Apache Spark User List mailing list archive
>> at
>> >>>>>> >> >>>>> Nabble.com.
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>>> ------------------------------
>> ---------------------------------------
>> >>>>>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >>>>>> >> >>>>>
>> >>>>>> >> >>>
>> >>>>>> >> >>>
>> >>>>>> >> >>
>> >>>>>> >> >>
>> >>>>>> >> >> ------------------------------------------------------------
>> ---------
>> >>>>>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >>>>>> >> >>
>> >>>>>> >> >
>> >>>>>> >
>> >>>>>> >
>> >>>>>
>> >>>>>
>> >>>>
>> >>
>> >>
>> >
>>
>
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Ivan von Nagy <iv...@vadio.com>.
As the code iterates through the parallel list, it is processing up to 8
KafkaRDD at a time. Each has it's own unique topic and consumer group now.
Every topic has 4 partitions, so technically there should never be more
then 32 CachedKafkaConsumers. However, this seems to not be the case as we
are using the default settings for cache size (16 initial -> 64 max) and
PreferConsistent for the location strategy. I do notice the concurrent
modification exception occurs when a cached consumer is being dropped out
of the cache when it reaches the max, 64. After looking at the code, the
KafkaRDDIterator will only close it's consumer if we are not caching (makes
sense), but there is no other way to close/drop the consumer until it gets
dropped from the cache. Perhaps there is an issue with resources here since
RDDs don't inherently have any resource management support, like "I am done
so cleanup now".

Over the course of this job, it will probably process upwards of 100-150
different channels so about 400-600 partitions. Does this mean we should
bump the cache size that high even though only about 8 channels (32
partitions) are being handled by the executors at any given time?

Thanks,

Ivan

On Sat, Nov 12, 2016 at 1:25 PM, Cody Koeninger <co...@koeninger.org> wrote:

> You should not be getting consumer churn on executors at all, that's
> the whole point of the cache.  How many partitions are you trying to
> process per executor?
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#
> locationstrategies
>
> gives instructions on the default size of the cache and how to increase it.
>
> On Sat, Nov 12, 2016 at 2:15 PM, Ivan von Nagy <iv...@vadio.com> wrote:
> > Hi Sean,
> >
> > Thanks for responding. We have run our jobs with internal parallel
> > processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and
> did
> > not encounter any of these issues until upgrading to Spark 2.0.1 and
> Kafka
> > 0.10 clients. If we process serially, then we sometimes get the errors,
> but
> > far less often. Also, if done sequentially it takes sometimes more the
> 2x as
> > long which is not an option for this particular job.
> >
> > I posted another example on Nov 10th which is the example below. We
> > basically iterate through a list in parallel and sometimes the list
> could be
> > upwards of a hundred elements. The parallelism in Scala/Spark limits to
> > about 8 at a time on our nodes. For performance reasons we process in
> > parallel and we also separate each since each channel has their own
> topic.
> > We don't combine all into one KafkaRDD because that means we have to
> process
> > all or nothing if an error occurs. This way if a couple of channels
> fail, we
> > can re-run the job and it will only process those channels.
> >
> > This has just been perplexing since we had never encountered any errors
> for
> > well over a year using the prior versions. At this time, I am just
> seeking
> > any configuration options or code changes that we may be missing or even
> at
> > a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that
> > surfaced these issues.
> >
> > We continue to use Spark 1.6 with the Kafka 0.8.x clients until this can
> be
> > figured out, however, it is a deal breaker for use to upgrade to Spark
> 2.x
> > with Kafka 0.10 clients. On a side note, we have not encountered any
> issues
> > with the Kafka Producers, this is simply with the KafkaRDD and its use of
> > CachedKafkaConsumer. Any help is much appreciated.
> >
> > Thanks,
> >
> > Ivan
> >
> > Example usage with KafkaRDD:
> > val channels = Seq("channel1", "channel2")
> >
> > channels.toParArray.foreach { channel =>
> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >
> >   // Get offsets for the given topic and the consumer group
> "$prefix-$topic"
> >   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
> >
> >   val ds = KafkaUtils.createRDD[K, V](context,
> >         kafkaParams asJava,
> >         offsetRanges,
> >         PreferConsistent).toDS[V]
> >
> >   // Do some aggregations
> >   ds.agg(...)
> >   // Save the data
> >   ds.write.mode(SaveMode.Append).parquet(somePath)
> >   // Save offsets using a KafkaConsumer
> >   consumer.commitSync(newOffsets.asJava)
> >   consumer.close()
> > }
> >
> > On Sat, Nov 12, 2016 at 11:46 AM, Sean McKibben <gr...@graphex.com>
> wrote:
> >>
> >> How are you iterating through your RDDs in parallel? In the past (Spark
> >> 1.5.2) when I've had actions being performed on multiple RDDs
> concurrently
> >> using futures, I've encountered some pretty bad behavior in Spark,
> >> especially during job retries. Very difficult to explain things, like
> >> records from one RDD leaking into a totally different (though shared
> >> lineage) RDD during job retries. I'm not sure what documentation exists
> >> around parallelizing on top of Spark's existing parallelization
> approach,
> >> but I would venture a guess that that could be the source of your
> concurrent
> >> access problems, and potentially other hidden issues. Have you tried a
> >> version of your app which doesn't parallelize actions on RDDs, but
> instead
> >> serially processes each RDD? I'm sure it isn't ideal performance-wise,
> but
> >> it seems like a good choice for an A/B test.
> >>
> >> The poll.ms issue could very well be settings or capability of your
> kafka
> >> cluster. I think other (non-Spark) approaches may have less consumer
> churn
> >> and be less susceptible to things like GC pauses or coordination
> latency. It
> >> could also be that the number of consumers being simultaneously created
> on
> >> each executor causes a thundering herd problem during initial phases
> (which
> >> then causes job retries, which then causes more consumer churn, etc.).
> >>
> >> Sean
> >>
> >>
> >>
> >> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <iv...@vadio.com> wrote:
> >>
> >> The code was changed to use a unique group for each KafkaRDD that was
> >> created (see Nov 10 post). There is no KafkaRDD being reused. The basic
> >> logic (see Nov 10 post for example) is get a list of channels, iterate
> >> through them in parallel, load a KafkaRDD using a given topic and a
> consumer
> >> group that is made from the topic (each RDD uses a different topic and
> >> group), process the data and write to Parquet files.
> >>
> >> Per my Nov 10th post, we still get polling timeouts unless the poll.ms
> is
> >> set to something like 10 seconds. We also get concurrent modification
> >> exceptions as well. I believe the key here is the processing of data in
> >> parallel is where we encounter issues so we are looking for some
> possible
> >> answers surrounding this.
> >>
> >> Thanks,
> >>
> >> Ivan
> >>
> >>
> >> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <co...@koeninger.org>
> >> wrote:
> >>>
> >>> It is already documented that you must use a different group id, which
> as
> >>> far as I can tell you are still not doing.
> >>>
> >>>
> >>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixiong@databricks.com
> >
> >>> wrote:
> >>>>
> >>>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
> >>>>
> >>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <iv...@vadio.com>
> wrote:
> >>>>>
> >>>>> Ok, I have split he KafkaRDD logic to each use their own group and
> >>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on
> the
> >>>>> poll.ms ends up with a timeout and exception so I am still
> perplexed on that
> >>>>> one. The new error I am getting now is a `
> ConcurrentModificationException`
> >>>>> when Spark is trying to remove the CachedKafkaConsumer.
> >>>>>
> >>>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
> >>>>> for multi-threaded access
> >>>>> at
> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> >>>>> at
> >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> >>>>> at
> >>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> >>>>> at java.util.LinkedHashMap.afterNodeInsertion(
> LinkedHashMap.java:299)
> >>>>>
> >>>>> Here is the basic logic:
> >>>>>
> >>>>> Using KafkaRDD - This takes a list of channels and processes them in
> >>>>> parallel using the KafkaRDD directly. They each use a distinct
> consumer
> >>>>> group (s"$prefix-$topic"), and each has it's own topic and each
> topic has 4
> >>>>> partitions. We routinely get timeout errors when polling for data
> when the
> >>>>> poll.ms is less then 2 seconds. This occurs whether we process in
> parallel.
> >>>>>
> >>>>> Example usage with KafkaRDD:
> >>>>> val channels = Seq("channel1", "channel2")
> >>>>>
> >>>>> channels.toParArray.foreach { channel =>
> >>>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >>>>>
> >>>>>   // Get offsets for the given topic and the consumer group
> >>>>> "$prefix-$topic"
> >>>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
> >>>>>
> >>>>>   val ds = KafkaUtils.createRDD[K, V](context,
> >>>>>         kafkaParams asJava,
> >>>>>         offsetRanges,
> >>>>>         PreferConsistent).toDS[V]
> >>>>>
> >>>>>   // Do some aggregations
> >>>>>   ds.agg(...)
> >>>>>   // Save the data
> >>>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
> >>>>>   // Save offsets using a KafkaConsumer
> >>>>>   consumer.commitSync(newOffsets.asJava)
> >>>>>   consumer.close()
> >>>>> }
> >>>>>
> >>>>> I am not sure why the concurrent issue is there as I have tried to
> >>>>> debug and also looked at the KafkaConsumer code as well, but
> everything
> >>>>> looks like it should not occur. The things to figure out is why when
> running
> >>>>> in parallel does this occur and also why the timeouts still occur.
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Ivan
> >>>>>
> >>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <co...@koeninger.org>
> >>>>> wrote:
> >>>>>>
> >>>>>> There definitely is Kafka documentation indicating that you should
> use
> >>>>>> a different consumer group for logically different subscribers, this
> >>>>>> is really basic to Kafka:
> >>>>>>
> >>>>>> http://kafka.apache.org/documentation#intro_consumers
> >>>>>>
> >>>>>> As for your comment that "commit async after each RDD, which is not
> >>>>>> really viable also", how is it not viable?  Again, committing
> offsets
> >>>>>> to Kafka doesn't give you reliable delivery semantics unless your
> >>>>>> downstream data store is idempotent.  If your downstream data store
> is
> >>>>>> idempotent, then it shouldn't matter to you when offset commits
> >>>>>> happen, as long as they happen within a reasonable time after the
> data
> >>>>>> is written.
> >>>>>>
> >>>>>> Do you want to keep arguing with me, or follow my advice and proceed
> >>>>>> with debugging any remaining issues after you make the changes I
> >>>>>> suggested?
> >>>>>>
> >>>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com>
> wrote:
> >>>>>> > With our stream version, we update the offsets for only the
> >>>>>> > partition we
> >>>>>> > operating on. We even break down the partition into smaller
> batches
> >>>>>> > and then
> >>>>>> > update the offsets after each batch within the partition. With
> Spark
> >>>>>> > 1.6 and
> >>>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this
> is
> >>>>>> > not
> >>>>>> > necessarily a Spark issue since Kafka no longer allows you to
> simply
> >>>>>> > update
> >>>>>> > the offsets for a given consumer group. You have to subscribe or
> >>>>>> > assign
> >>>>>> > partitions to even do so.
> >>>>>> >
> >>>>>> > As for storing the offsets in some other place like a DB, it don't
> >>>>>> > find this
> >>>>>> > useful because you then can't use tools like Kafka Manager. In
> order
> >>>>>> > to do
> >>>>>> > so you would have to store in a DB and the circle back and update
> >>>>>> > Kafka
> >>>>>> > afterwards. This means you have to keep two sources in sync which
> is
> >>>>>> > not
> >>>>>> > really a good idea.
> >>>>>> >
> >>>>>> > It is a challenge in Spark to use the Kafka offsets since the
> drive
> >>>>>> > keeps
> >>>>>> > subscribed to the topic(s) and consumer group, while the executors
> >>>>>> > prepend
> >>>>>> > "spark-executor-" to the consumer group. The stream (driver) does
> >>>>>> > allow you
> >>>>>> > to commit async after each RDD, which is not really viable also. I
> >>>>>> > have not
> >>>>>> > of implementing an Akka actor system on the driver and send it
> >>>>>> > messages from
> >>>>>> > the executor code to update the offsets, but then that is
> >>>>>> > asynchronous as
> >>>>>> > well so not really a good solution.
> >>>>>> >
> >>>>>> > I have no idea why Kafka made this change and also why in the
> >>>>>> > parallel
> >>>>>> > KafkaRDD application we would be advised to use different consumer
> >>>>>> > groups
> >>>>>> > for each RDD. That seems strange to me that different consumer
> >>>>>> > groups would
> >>>>>> > be required or advised. There is no Kafka documentation that I
> know
> >>>>>> > if that
> >>>>>> > states this. The biggest issue I see with the parallel KafkaRDD is
> >>>>>> > the
> >>>>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get
> >>>>>> > the issue.
> >>>>>> > Something is not right here and just not seem right. As I
> mentioned
> >>>>>> > with the
> >>>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
> >>>>>> > this
> >>>>>> > issue. We have been running the same basic logic for over a year
> now
> >>>>>> > without
> >>>>>> > one hitch at all.
> >>>>>> >
> >>>>>> > Ivan
> >>>>>> >
> >>>>>> >
> >>>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <
> cody@koeninger.org>
> >>>>>> > wrote:
> >>>>>> >>
> >>>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
> >>>>>> >> general, not just kafka) have been progressing on to the next
> batch
> >>>>>> >> after a given batch aborts for quite some time now.  Yet another
> >>>>>> >> reason I put offsets in my database transactionally.  My jobs
> throw
> >>>>>> >> exceptions if the offset in the DB isn't what I expected it to
> be.
> >>>>>> >>
> >>>>>> >>
> >>>>>> >>
> >>>>>> >>
> >>>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <
> graphex@graphex.com>
> >>>>>> >> wrote:
> >>>>>> >> > I've been encountering the same kinds of timeout issues as
> Ivan,
> >>>>>> >> > using
> >>>>>> >> > the "Kafka Stream" approach that he is using, except I'm
> storing
> >>>>>> >> > my offsets
> >>>>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
> >>>>>> >> > haven't yet
> >>>>>> >> > implemented the KafkaRDD approach, and therefore don't have the
> >>>>>> >> > concurrency
> >>>>>> >> > issues, but a very similar use case is coming up for me soon,
> >>>>>> >> > it's just been
> >>>>>> >> > backburnered until I can get streaming to be more reliable (I
> >>>>>> >> > will
> >>>>>> >> > definitely ensure unique group IDs when I do). Offset commits
> are
> >>>>>> >> > certainly
> >>>>>> >> > more painful in Kafka 0.10, and that doesn't have anything to
> do
> >>>>>> >> > with Spark.
> >>>>>> >> >
> >>>>>> >> > While i may be able to alleviate the timeout by just increasing
> >>>>>> >> > it, I've
> >>>>>> >> > noticed something else that is more worrying: When one task
> fails
> >>>>>> >> > 4 times in
> >>>>>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
> >>>>>> >> > Spark aborts
> >>>>>> >> > the Stage and Job with "Job aborted due to stage failure: Task
> _
> >>>>>> >> > in stage _
> >>>>>> >> > failed 4 times". That's fine, and it's the behavior I want, but
> >>>>>> >> > instead of
> >>>>>> >> > stopping the Application there (as previous versions of Spark
> >>>>>> >> > did) the next
> >>>>>> >> > microbatch marches on and offsets are committed ahead of the
> >>>>>> >> > failed
> >>>>>> >> > microbatch. Suddenly my at-least-once app becomes more
> >>>>>> >> > sometimes-at-least-once which is no good. In order for spark to
> >>>>>> >> > display that
> >>>>>> >> > failure, I must be propagating the errors up to Spark, but the
> >>>>>> >> > behavior of
> >>>>>> >> > marching forward with the next microbatch seems to be new, and
> a
> >>>>>> >> > big
> >>>>>> >> > potential for data loss in streaming applications.
> >>>>>> >> >
> >>>>>> >> > Am I perhaps missing a setting to stop the entire streaming
> >>>>>> >> > application
> >>>>>> >> > once spark.task.maxFailures is reached? Has anyone else seen
> this
> >>>>>> >> > behavior
> >>>>>> >> > of a streaming application skipping over failed microbatches?
> >>>>>> >> >
> >>>>>> >> > Thanks,
> >>>>>> >> > Sean
> >>>>>> >> >
> >>>>>> >> >
> >>>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <
> cody@koeninger.org>
> >>>>>> >> >> wrote:
> >>>>>> >> >>
> >>>>>> >> >> So basically what I am saying is
> >>>>>> >> >>
> >>>>>> >> >> - increase poll.ms
> >>>>>> >> >> - use a separate group id everywhere
> >>>>>> >> >> - stop committing offsets under the covers
> >>>>>> >> >>
> >>>>>> >> >> That should eliminate all of those as possible causes, and
> then
> >>>>>> >> >> we can
> >>>>>> >> >> see if there are still issues.
> >>>>>> >> >>
> >>>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> >>>>>> >> >> subscribe to a topic in order to update offsets, Kafka does.
> If
> >>>>>> >> >> you
> >>>>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
> >>>>>> >> >> consumer api should be usable with later brokers.  As long as
> >>>>>> >> >> you
> >>>>>> >> >> don't need SSL or dynamic subscriptions, and it meets your
> >>>>>> >> >> needs, keep
> >>>>>> >> >> using it.
> >>>>>> >> >>
> >>>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <ivan@vadio.com
> >
> >>>>>> >> >> wrote:
> >>>>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but
> >>>>>> >> >>> each RDD
> >>>>>> >> >>> uses a
> >>>>>> >> >>> single distinct topic. For example, the group would be
> >>>>>> >> >>> something like
> >>>>>> >> >>> "storage-group", and the topics would be "storage-channel1",
> >>>>>> >> >>> and
> >>>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is
> started,
> >>>>>> >> >>> assigned the
> >>>>>> >> >>> partitions assigned, and then commit offsets are called after
> >>>>>> >> >>> the RDD
> >>>>>> >> >>> is
> >>>>>> >> >>> processed. This should not interfere with the consumer group
> >>>>>> >> >>> used by
> >>>>>> >> >>> the
> >>>>>> >> >>> executors which would be "spark-executor-storage-group".
> >>>>>> >> >>>
> >>>>>> >> >>> In the streaming example there is a single topic
> >>>>>> >> >>> ("client-events") and
> >>>>>> >> >>> group
> >>>>>> >> >>> ("processing-group"). A single stream is created and offsets
> >>>>>> >> >>> are
> >>>>>> >> >>> manually
> >>>>>> >> >>> updated from the executor after each partition is handled.
> This
> >>>>>> >> >>> was a
> >>>>>> >> >>> challenge since Spark now requires one to assign or subscribe
> >>>>>> >> >>> to a
> >>>>>> >> >>> topic in
> >>>>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have
> >>>>>> >> >>> to worry
> >>>>>> >> >>> about
> >>>>>> >> >>> that. This approach limits your exposure to duplicate data
> >>>>>> >> >>> since
> >>>>>> >> >>> idempotent
> >>>>>> >> >>> records are not entirely possible in our scenario. At least
> >>>>>> >> >>> without a
> >>>>>> >> >>> lot of
> >>>>>> >> >>> re-running of logic to de-dup.
> >>>>>> >> >>>
> >>>>>> >> >>> Thanks,
> >>>>>> >> >>>
> >>>>>> >> >>> Ivan
> >>>>>> >> >>>
> >>>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger
> >>>>>> >> >>> <co...@koeninger.org>
> >>>>>> >> >>> wrote:
> >>>>>> >> >>>>
> >>>>>> >> >>>> So just to be clear, the answers to my questions are
> >>>>>> >> >>>>
> >>>>>> >> >>>> - you are not using different group ids, you're using the
> same
> >>>>>> >> >>>> group
> >>>>>> >> >>>> id everywhere
> >>>>>> >> >>>>
> >>>>>> >> >>>> - you are committing offsets manually
> >>>>>> >> >>>>
> >>>>>> >> >>>> Right?
> >>>>>> >> >>>>
> >>>>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
> >>>>>> >> >>>> source,
> >>>>>> >> >>>> tune poll.ms upwards even higher.
> >>>>>> >> >>>>
> >>>>>> >> >>>> You must use different group ids for different rdds or
> >>>>>> >> >>>> streams.
> >>>>>> >> >>>> Kafka consumers won't behave the way you expect if they are
> >>>>>> >> >>>> all in
> >>>>>> >> >>>> the
> >>>>>> >> >>>> same group id, and the consumer cache is keyed by group id.
> >>>>>> >> >>>> Yes, the
> >>>>>> >> >>>> executor will tack "spark-executor-" on to the beginning,
> but
> >>>>>> >> >>>> if you
> >>>>>> >> >>>> give it the same base group id, it will be the same.  And
> the
> >>>>>> >> >>>> driver
> >>>>>> >> >>>> will use the group id you gave it, unmodified.
> >>>>>> >> >>>>
> >>>>>> >> >>>> Finally, I really can't help you if you're manually writing
> >>>>>> >> >>>> your own
> >>>>>> >> >>>> code to commit offsets directly to Kafka.  Trying to
> minimize
> >>>>>> >> >>>> duplicates that way doesn't really make sense, your system
> >>>>>> >> >>>> must be
> >>>>>> >> >>>> able to handle duplicates if you're using kafka as an
> offsets
> >>>>>> >> >>>> store,
> >>>>>> >> >>>> it can't do transactional exactly once.
> >>>>>> >> >>>>
> >>>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com>
> >>>>>> >> >>>> wrote:
> >>>>>> >> >>>>> Here are some examples and details of the scenarios. The
> >>>>>> >> >>>>> KafkaRDD is
> >>>>>> >> >>>>> the
> >>>>>> >> >>>>> most
> >>>>>> >> >>>>> error prone to polling
> >>>>>> >> >>>>> timeouts and concurrentm modification errors.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and
> >>>>>> >> >>>>> processes them
> >>>>>> >> >>>>> in
> >>>>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
> >>>>>> >> >>>>> consumer
> >>>>>> >> >>>>> group
> >>>>>> >> >>>>> ('storage-group'), but each has it's own topic and each
> topic
> >>>>>> >> >>>>> has 4
> >>>>>> >> >>>>> partitions. We routinely get timeout errors when polling
> for
> >>>>>> >> >>>>> data.
> >>>>>> >> >>>>> This
> >>>>>> >> >>>>> occurs whether we process in parallel or sequentially.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Spark Kafka setting:*
> >>>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Kafka Consumer Params:*
> >>>>>> >> >>>>> metric.reporters = []
> >>>>>> >> >>>>> metadata.max.age.ms = 300000
> >>>>>> >> >>>>> partition.assignment.strategy =
> >>>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
> >>>>>> >> >>>>> reconnect.backoff.ms = 50
> >>>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
> >>>>>> >> >>>>> max.partition.fetch.bytes = 1048576
> >>>>>> >> >>>>> bootstrap.servers = [somemachine:31000]
> >>>>>> >> >>>>> ssl.keystore.type = JKS
> >>>>>> >> >>>>> enable.auto.commit = false
> >>>>>> >> >>>>> sasl.mechanism = GSSAPI
> >>>>>> >> >>>>> interceptor.classes = null
> >>>>>> >> >>>>> exclude.internal.topics = true
> >>>>>> >> >>>>> ssl.truststore.password = null
> >>>>>> >> >>>>> client.id =
> >>>>>> >> >>>>> ssl.endpoint.identification.algorithm = null
> >>>>>> >> >>>>> max.poll.records = 1000
> >>>>>> >> >>>>> check.crcs = true
> >>>>>> >> >>>>> request.timeout.ms = 40000
> >>>>>> >> >>>>> heartbeat.interval.ms = 3000
> >>>>>> >> >>>>> auto.commit.interval.ms = 5000
> >>>>>> >> >>>>> receive.buffer.bytes = 65536
> >>>>>> >> >>>>> ssl.truststore.type = JKS
> >>>>>> >> >>>>> ssl.truststore.location = null
> >>>>>> >> >>>>> ssl.keystore.password = null
> >>>>>> >> >>>>> fetch.min.bytes = 1
> >>>>>> >> >>>>> send.buffer.bytes = 131072
> >>>>>> >> >>>>> value.deserializer = class
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> com.vadio.analytics.spark.storage.
> ClientEventJsonOptionDeserializer
> >>>>>> >> >>>>> group.id = storage-group
> >>>>>> >> >>>>> retry.backoff.ms = 100
> >>>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>>>>> >> >>>>> sasl.kerberos.service.name = null
> >>>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
> >>>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
> >>>>>> >> >>>>> ssl.key.password = null
> >>>>>> >> >>>>> fetch.max.wait.ms = 500
> >>>>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
> >>>>>> >> >>>>> connections.max.idle.ms = 540000
> >>>>>> >> >>>>> session.timeout.ms = 30000
> >>>>>> >> >>>>> metrics.num.samples = 2
> >>>>>> >> >>>>> key.deserializer = class
> >>>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
> >>>>>> >> >>>>> ssl.protocol = TLS
> >>>>>> >> >>>>> ssl.provider = null
> >>>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >>>>>> >> >>>>> ssl.keystore.location = null
> >>>>>> >> >>>>> ssl.cipher.suites = null
> >>>>>> >> >>>>> security.protocol = PLAINTEXT
> >>>>>> >> >>>>> ssl.keymanager.algorithm = SunX509
> >>>>>> >> >>>>> metrics.sample.window.ms = 30000
> >>>>>> >> >>>>> auto.offset.reset = earliest
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Example usage with KafkaRDD:*
> >>>>>> >> >>>>> val channels = Seq("channel1", "channel2")
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> channels.toParArray.foreach { channel =>
> >>>>>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>  // Get offsets for the given topic and the consumer group
> >>>>>> >> >>>>> 'storage-group'
> >>>>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
> >>>>>> >> >>>>>        kafkaParams asJava,
> >>>>>> >> >>>>>        offsetRanges,
> >>>>>> >> >>>>>        PreferConsistent).toDS[V]
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>  // Do some aggregations
> >>>>>> >> >>>>>  ds.agg(...)
> >>>>>> >> >>>>>  // Save the data
> >>>>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
> >>>>>> >> >>>>>  // Save offsets using a KafkaConsumer
> >>>>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
> >>>>>> >> >>>>>  consumer.close()
> >>>>>> >> >>>>> }
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> *Example usage with Kafka Stream:*
> >>>>>> >> >>>>> This creates a stream and processes events in each
> partition.
> >>>>>> >> >>>>> At the
> >>>>>> >> >>>>> end
> >>>>>> >> >>>>> of
> >>>>>> >> >>>>> processing for
> >>>>>> >> >>>>> each partition, we updated the offsets for each partition.
> >>>>>> >> >>>>> This is
> >>>>>> >> >>>>> challenging to do, but is better
> >>>>>> >> >>>>> then calling commitAysnc on the stream, because that occurs
> >>>>>> >> >>>>> after
> >>>>>> >> >>>>> the
> >>>>>> >> >>>>> /entire/ RDD has been
> >>>>>> >> >>>>> processed. This method minimizes duplicates in an exactly
> >>>>>> >> >>>>> once
> >>>>>> >> >>>>> environment.
> >>>>>> >> >>>>> Since the executors
> >>>>>> >> >>>>> use their own custom group "spark-executor-processor-
> group"
> >>>>>> >> >>>>> and the
> >>>>>> >> >>>>> commit
> >>>>>> >> >>>>> is buried in private
> >>>>>> >> >>>>> functions we are unable to use the executors cached
> consumer
> >>>>>> >> >>>>> to
> >>>>>> >> >>>>> update
> >>>>>> >> >>>>> the
> >>>>>> >> >>>>> offsets. This requires us
> >>>>>> >> >>>>> to go through multiple steps to update the Kafka offsets
> >>>>>> >> >>>>> accordingly.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> val offsetRanges = getOffsets("processor-group",
> "my-topic")
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
> >>>>>> >> >>>>>      PreferConsistent,
> >>>>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
> >>>>>> >> >>>>>        kafkaParams,
> >>>>>> >> >>>>>        offsetRanges))
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> stream.foreachRDD { rdd =>
> >>>>>> >> >>>>>    val offsetRanges =
> >>>>>> >> >>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>    // Transform our data
> >>>>>> >> >>>>>   rdd.foreachPartition { events =>
> >>>>>> >> >>>>>       // Establish a consumer in the executor so we can
> >>>>>> >> >>>>> update
> >>>>>> >> >>>>> offsets
> >>>>>> >> >>>>> after each partition.
> >>>>>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer
> >>>>>> >> >>>>> to help
> >>>>>> >> >>>>> get/set
> >>>>>> >> >>>>> offsets
> >>>>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
> >>>>>> >> >>>>>       // do something with our data
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>       // Write the offsets that were updated in this
> >>>>>> >> >>>>> partition
> >>>>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
> >>>>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
> >>>>>> >> >>>>> endOffset))
> >>>>>> >> >>>>>   }
> >>>>>> >> >>>>> }
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> --
> >>>>>> >> >>>>> View this message in context:
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/
> Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
> >>>>>> >> >>>>> Sent from the Apache Spark User List mailing list archive
> at
> >>>>>> >> >>>>> Nabble.com.
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>>
> >>>>>> >> >>>>> ------------------------------
> ---------------------------------------
> >>>>>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>>>>> >> >>>>>
> >>>>>> >> >>>
> >>>>>> >> >>>
> >>>>>> >> >>
> >>>>>> >> >>
> >>>>>> >> >> ------------------------------------------------------------
> ---------
> >>>>>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>>>>> >> >>
> >>>>>> >> >
> >>>>>> >
> >>>>>> >
> >>>>>
> >>>>>
> >>>>
> >>
> >>
> >
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
You should not be getting consumer churn on executors at all, that's
the whole point of the cache.  How many partitions are you trying to
process per executor?

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies

gives instructions on the default size of the cache and how to increase it.

On Sat, Nov 12, 2016 at 2:15 PM, Ivan von Nagy <iv...@vadio.com> wrote:
> Hi Sean,
>
> Thanks for responding. We have run our jobs with internal parallel
> processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and did
> not encounter any of these issues until upgrading to Spark 2.0.1 and Kafka
> 0.10 clients. If we process serially, then we sometimes get the errors, but
> far less often. Also, if done sequentially it takes sometimes more the 2x as
> long which is not an option for this particular job.
>
> I posted another example on Nov 10th which is the example below. We
> basically iterate through a list in parallel and sometimes the list could be
> upwards of a hundred elements. The parallelism in Scala/Spark limits to
> about 8 at a time on our nodes. For performance reasons we process in
> parallel and we also separate each since each channel has their own topic.
> We don't combine all into one KafkaRDD because that means we have to process
> all or nothing if an error occurs. This way if a couple of channels fail, we
> can re-run the job and it will only process those channels.
>
> This has just been perplexing since we had never encountered any errors for
> well over a year using the prior versions. At this time, I am just seeking
> any configuration options or code changes that we may be missing or even at
> a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that
> surfaced these issues.
>
> We continue to use Spark 1.6 with the Kafka 0.8.x clients until this can be
> figured out, however, it is a deal breaker for use to upgrade to Spark 2.x
> with Kafka 0.10 clients. On a side note, we have not encountered any issues
> with the Kafka Producers, this is simply with the KafkaRDD and its use of
> CachedKafkaConsumer. Any help is much appreciated.
>
> Thanks,
>
> Ivan
>
> Example usage with KafkaRDD:
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group "$prefix-$topic"
>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
>         kafkaParams asJava,
>         offsetRanges,
>         PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
> On Sat, Nov 12, 2016 at 11:46 AM, Sean McKibben <gr...@graphex.com> wrote:
>>
>> How are you iterating through your RDDs in parallel? In the past (Spark
>> 1.5.2) when I've had actions being performed on multiple RDDs concurrently
>> using futures, I've encountered some pretty bad behavior in Spark,
>> especially during job retries. Very difficult to explain things, like
>> records from one RDD leaking into a totally different (though shared
>> lineage) RDD during job retries. I'm not sure what documentation exists
>> around parallelizing on top of Spark's existing parallelization approach,
>> but I would venture a guess that that could be the source of your concurrent
>> access problems, and potentially other hidden issues. Have you tried a
>> version of your app which doesn't parallelize actions on RDDs, but instead
>> serially processes each RDD? I'm sure it isn't ideal performance-wise, but
>> it seems like a good choice for an A/B test.
>>
>> The poll.ms issue could very well be settings or capability of your kafka
>> cluster. I think other (non-Spark) approaches may have less consumer churn
>> and be less susceptible to things like GC pauses or coordination latency. It
>> could also be that the number of consumers being simultaneously created on
>> each executor causes a thundering herd problem during initial phases (which
>> then causes job retries, which then causes more consumer churn, etc.).
>>
>> Sean
>>
>>
>>
>> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <iv...@vadio.com> wrote:
>>
>> The code was changed to use a unique group for each KafkaRDD that was
>> created (see Nov 10 post). There is no KafkaRDD being reused. The basic
>> logic (see Nov 10 post for example) is get a list of channels, iterate
>> through them in parallel, load a KafkaRDD using a given topic and a consumer
>> group that is made from the topic (each RDD uses a different topic and
>> group), process the data and write to Parquet files.
>>
>> Per my Nov 10th post, we still get polling timeouts unless the poll.ms is
>> set to something like 10 seconds. We also get concurrent modification
>> exceptions as well. I believe the key here is the processing of data in
>> parallel is where we encounter issues so we are looking for some possible
>> answers surrounding this.
>>
>> Thanks,
>>
>> Ivan
>>
>>
>> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>>
>>> It is already documented that you must use a different group id, which as
>>> far as I can tell you are still not doing.
>>>
>>>
>>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <sh...@databricks.com>
>>> wrote:
>>>>
>>>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>>>>
>>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <iv...@vadio.com> wrote:
>>>>>
>>>>> Ok, I have split he KafkaRDD logic to each use their own group and
>>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on the
>>>>> poll.ms ends up with a timeout and exception so I am still perplexed on that
>>>>> one. The new error I am getting now is a `ConcurrentModificationException`
>>>>> when Spark is trying to remove the CachedKafkaConsumer.
>>>>>
>>>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>>>> for multi-threaded access
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
>>>>> at
>>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>>>>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>>>>
>>>>> Here is the basic logic:
>>>>>
>>>>> Using KafkaRDD - This takes a list of channels and processes them in
>>>>> parallel using the KafkaRDD directly. They each use a distinct consumer
>>>>> group (s"$prefix-$topic"), and each has it's own topic and each topic has 4
>>>>> partitions. We routinely get timeout errors when polling for data when the
>>>>> poll.ms is less then 2 seconds. This occurs whether we process in parallel.
>>>>>
>>>>> Example usage with KafkaRDD:
>>>>> val channels = Seq("channel1", "channel2")
>>>>>
>>>>> channels.toParArray.foreach { channel =>
>>>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>>
>>>>>   // Get offsets for the given topic and the consumer group
>>>>> "$prefix-$topic"
>>>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>>>>
>>>>>   val ds = KafkaUtils.createRDD[K, V](context,
>>>>>         kafkaParams asJava,
>>>>>         offsetRanges,
>>>>>         PreferConsistent).toDS[V]
>>>>>
>>>>>   // Do some aggregations
>>>>>   ds.agg(...)
>>>>>   // Save the data
>>>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>>   // Save offsets using a KafkaConsumer
>>>>>   consumer.commitSync(newOffsets.asJava)
>>>>>   consumer.close()
>>>>> }
>>>>>
>>>>> I am not sure why the concurrent issue is there as I have tried to
>>>>> debug and also looked at the KafkaConsumer code as well, but everything
>>>>> looks like it should not occur. The things to figure out is why when running
>>>>> in parallel does this occur and also why the timeouts still occur.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Ivan
>>>>>
>>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>>
>>>>>> There definitely is Kafka documentation indicating that you should use
>>>>>> a different consumer group for logically different subscribers, this
>>>>>> is really basic to Kafka:
>>>>>>
>>>>>> http://kafka.apache.org/documentation#intro_consumers
>>>>>>
>>>>>> As for your comment that "commit async after each RDD, which is not
>>>>>> really viable also", how is it not viable?  Again, committing offsets
>>>>>> to Kafka doesn't give you reliable delivery semantics unless your
>>>>>> downstream data store is idempotent.  If your downstream data store is
>>>>>> idempotent, then it shouldn't matter to you when offset commits
>>>>>> happen, as long as they happen within a reasonable time after the data
>>>>>> is written.
>>>>>>
>>>>>> Do you want to keep arguing with me, or follow my advice and proceed
>>>>>> with debugging any remaining issues after you make the changes I
>>>>>> suggested?
>>>>>>
>>>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>>>>>> > With our stream version, we update the offsets for only the
>>>>>> > partition we
>>>>>> > operating on. We even break down the partition into smaller batches
>>>>>> > and then
>>>>>> > update the offsets after each batch within the partition. With Spark
>>>>>> > 1.6 and
>>>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
>>>>>> > not
>>>>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>>>>> > update
>>>>>> > the offsets for a given consumer group. You have to subscribe or
>>>>>> > assign
>>>>>> > partitions to even do so.
>>>>>> >
>>>>>> > As for storing the offsets in some other place like a DB, it don't
>>>>>> > find this
>>>>>> > useful because you then can't use tools like Kafka Manager. In order
>>>>>> > to do
>>>>>> > so you would have to store in a DB and the circle back and update
>>>>>> > Kafka
>>>>>> > afterwards. This means you have to keep two sources in sync which is
>>>>>> > not
>>>>>> > really a good idea.
>>>>>> >
>>>>>> > It is a challenge in Spark to use the Kafka offsets since the drive
>>>>>> > keeps
>>>>>> > subscribed to the topic(s) and consumer group, while the executors
>>>>>> > prepend
>>>>>> > "spark-executor-" to the consumer group. The stream (driver) does
>>>>>> > allow you
>>>>>> > to commit async after each RDD, which is not really viable also. I
>>>>>> > have not
>>>>>> > of implementing an Akka actor system on the driver and send it
>>>>>> > messages from
>>>>>> > the executor code to update the offsets, but then that is
>>>>>> > asynchronous as
>>>>>> > well so not really a good solution.
>>>>>> >
>>>>>> > I have no idea why Kafka made this change and also why in the
>>>>>> > parallel
>>>>>> > KafkaRDD application we would be advised to use different consumer
>>>>>> > groups
>>>>>> > for each RDD. That seems strange to me that different consumer
>>>>>> > groups would
>>>>>> > be required or advised. There is no Kafka documentation that I know
>>>>>> > if that
>>>>>> > states this. The biggest issue I see with the parallel KafkaRDD is
>>>>>> > the
>>>>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get
>>>>>> > the issue.
>>>>>> > Something is not right here and just not seem right. As I mentioned
>>>>>> > with the
>>>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
>>>>>> > this
>>>>>> > issue. We have been running the same basic logic for over a year now
>>>>>> > without
>>>>>> > one hitch at all.
>>>>>> >
>>>>>> > Ivan
>>>>>> >
>>>>>> >
>>>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org>
>>>>>> > wrote:
>>>>>> >>
>>>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>>>>>> >> general, not just kafka) have been progressing on to the next batch
>>>>>> >> after a given batch aborts for quite some time now.  Yet another
>>>>>> >> reason I put offsets in my database transactionally.  My jobs throw
>>>>>> >> exceptions if the offset in the DB isn't what I expected it to be.
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com>
>>>>>> >> wrote:
>>>>>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>>>>>> >> > using
>>>>>> >> > the "Kafka Stream" approach that he is using, except I'm storing
>>>>>> >> > my offsets
>>>>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>>>>>> >> > haven't yet
>>>>>> >> > implemented the KafkaRDD approach, and therefore don't have the
>>>>>> >> > concurrency
>>>>>> >> > issues, but a very similar use case is coming up for me soon,
>>>>>> >> > it's just been
>>>>>> >> > backburnered until I can get streaming to be more reliable (I
>>>>>> >> > will
>>>>>> >> > definitely ensure unique group IDs when I do). Offset commits are
>>>>>> >> > certainly
>>>>>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>>>>>> >> > with Spark.
>>>>>> >> >
>>>>>> >> > While i may be able to alleviate the timeout by just increasing
>>>>>> >> > it, I've
>>>>>> >> > noticed something else that is more worrying: When one task fails
>>>>>> >> > 4 times in
>>>>>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>>>>>> >> > Spark aborts
>>>>>> >> > the Stage and Job with "Job aborted due to stage failure: Task _
>>>>>> >> > in stage _
>>>>>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>>>>>> >> > instead of
>>>>>> >> > stopping the Application there (as previous versions of Spark
>>>>>> >> > did) the next
>>>>>> >> > microbatch marches on and offsets are committed ahead of the
>>>>>> >> > failed
>>>>>> >> > microbatch. Suddenly my at-least-once app becomes more
>>>>>> >> > sometimes-at-least-once which is no good. In order for spark to
>>>>>> >> > display that
>>>>>> >> > failure, I must be propagating the errors up to Spark, but the
>>>>>> >> > behavior of
>>>>>> >> > marching forward with the next microbatch seems to be new, and a
>>>>>> >> > big
>>>>>> >> > potential for data loss in streaming applications.
>>>>>> >> >
>>>>>> >> > Am I perhaps missing a setting to stop the entire streaming
>>>>>> >> > application
>>>>>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>>>>>> >> > behavior
>>>>>> >> > of a streaming application skipping over failed microbatches?
>>>>>> >> >
>>>>>> >> > Thanks,
>>>>>> >> > Sean
>>>>>> >> >
>>>>>> >> >
>>>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org>
>>>>>> >> >> wrote:
>>>>>> >> >>
>>>>>> >> >> So basically what I am saying is
>>>>>> >> >>
>>>>>> >> >> - increase poll.ms
>>>>>> >> >> - use a separate group id everywhere
>>>>>> >> >> - stop committing offsets under the covers
>>>>>> >> >>
>>>>>> >> >> That should eliminate all of those as possible causes, and then
>>>>>> >> >> we can
>>>>>> >> >> see if there are still issues.
>>>>>> >> >>
>>>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>>>>>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If
>>>>>> >> >> you
>>>>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>>>>>> >> >> consumer api should be usable with later brokers.  As long as
>>>>>> >> >> you
>>>>>> >> >> don't need SSL or dynamic subscriptions, and it meets your
>>>>>> >> >> needs, keep
>>>>>> >> >> using it.
>>>>>> >> >>
>>>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com>
>>>>>> >> >> wrote:
>>>>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but
>>>>>> >> >>> each RDD
>>>>>> >> >>> uses a
>>>>>> >> >>> single distinct topic. For example, the group would be
>>>>>> >> >>> something like
>>>>>> >> >>> "storage-group", and the topics would be "storage-channel1",
>>>>>> >> >>> and
>>>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>>>>>> >> >>> assigned the
>>>>>> >> >>> partitions assigned, and then commit offsets are called after
>>>>>> >> >>> the RDD
>>>>>> >> >>> is
>>>>>> >> >>> processed. This should not interfere with the consumer group
>>>>>> >> >>> used by
>>>>>> >> >>> the
>>>>>> >> >>> executors which would be "spark-executor-storage-group".
>>>>>> >> >>>
>>>>>> >> >>> In the streaming example there is a single topic
>>>>>> >> >>> ("client-events") and
>>>>>> >> >>> group
>>>>>> >> >>> ("processing-group"). A single stream is created and offsets
>>>>>> >> >>> are
>>>>>> >> >>> manually
>>>>>> >> >>> updated from the executor after each partition is handled. This
>>>>>> >> >>> was a
>>>>>> >> >>> challenge since Spark now requires one to assign or subscribe
>>>>>> >> >>> to a
>>>>>> >> >>> topic in
>>>>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have
>>>>>> >> >>> to worry
>>>>>> >> >>> about
>>>>>> >> >>> that. This approach limits your exposure to duplicate data
>>>>>> >> >>> since
>>>>>> >> >>> idempotent
>>>>>> >> >>> records are not entirely possible in our scenario. At least
>>>>>> >> >>> without a
>>>>>> >> >>> lot of
>>>>>> >> >>> re-running of logic to de-dup.
>>>>>> >> >>>
>>>>>> >> >>> Thanks,
>>>>>> >> >>>
>>>>>> >> >>> Ivan
>>>>>> >> >>>
>>>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger
>>>>>> >> >>> <co...@koeninger.org>
>>>>>> >> >>> wrote:
>>>>>> >> >>>>
>>>>>> >> >>>> So just to be clear, the answers to my questions are
>>>>>> >> >>>>
>>>>>> >> >>>> - you are not using different group ids, you're using the same
>>>>>> >> >>>> group
>>>>>> >> >>>> id everywhere
>>>>>> >> >>>>
>>>>>> >> >>>> - you are committing offsets manually
>>>>>> >> >>>>
>>>>>> >> >>>> Right?
>>>>>> >> >>>>
>>>>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
>>>>>> >> >>>> source,
>>>>>> >> >>>> tune poll.ms upwards even higher.
>>>>>> >> >>>>
>>>>>> >> >>>> You must use different group ids for different rdds or
>>>>>> >> >>>> streams.
>>>>>> >> >>>> Kafka consumers won't behave the way you expect if they are
>>>>>> >> >>>> all in
>>>>>> >> >>>> the
>>>>>> >> >>>> same group id, and the consumer cache is keyed by group id.
>>>>>> >> >>>> Yes, the
>>>>>> >> >>>> executor will tack "spark-executor-" on to the beginning, but
>>>>>> >> >>>> if you
>>>>>> >> >>>> give it the same base group id, it will be the same.  And the
>>>>>> >> >>>> driver
>>>>>> >> >>>> will use the group id you gave it, unmodified.
>>>>>> >> >>>>
>>>>>> >> >>>> Finally, I really can't help you if you're manually writing
>>>>>> >> >>>> your own
>>>>>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>>>>>> >> >>>> duplicates that way doesn't really make sense, your system
>>>>>> >> >>>> must be
>>>>>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>>>>>> >> >>>> store,
>>>>>> >> >>>> it can't do transactional exactly once.
>>>>>> >> >>>>
>>>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com>
>>>>>> >> >>>> wrote:
>>>>>> >> >>>>> Here are some examples and details of the scenarios. The
>>>>>> >> >>>>> KafkaRDD is
>>>>>> >> >>>>> the
>>>>>> >> >>>>> most
>>>>>> >> >>>>> error prone to polling
>>>>>> >> >>>>> timeouts and concurrentm modification errors.
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and
>>>>>> >> >>>>> processes them
>>>>>> >> >>>>> in
>>>>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>>>>>> >> >>>>> consumer
>>>>>> >> >>>>> group
>>>>>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>>>>>> >> >>>>> has 4
>>>>>> >> >>>>> partitions. We routinely get timeout errors when polling for
>>>>>> >> >>>>> data.
>>>>>> >> >>>>> This
>>>>>> >> >>>>> occurs whether we process in parallel or sequentially.
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Spark Kafka setting:*
>>>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Kafka Consumer Params:*
>>>>>> >> >>>>> metric.reporters = []
>>>>>> >> >>>>> metadata.max.age.ms = 300000
>>>>>> >> >>>>> partition.assignment.strategy =
>>>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>>>> >> >>>>> reconnect.backoff.ms = 50
>>>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>>>> >> >>>>> max.partition.fetch.bytes = 1048576
>>>>>> >> >>>>> bootstrap.servers = [somemachine:31000]
>>>>>> >> >>>>> ssl.keystore.type = JKS
>>>>>> >> >>>>> enable.auto.commit = false
>>>>>> >> >>>>> sasl.mechanism = GSSAPI
>>>>>> >> >>>>> interceptor.classes = null
>>>>>> >> >>>>> exclude.internal.topics = true
>>>>>> >> >>>>> ssl.truststore.password = null
>>>>>> >> >>>>> client.id =
>>>>>> >> >>>>> ssl.endpoint.identification.algorithm = null
>>>>>> >> >>>>> max.poll.records = 1000
>>>>>> >> >>>>> check.crcs = true
>>>>>> >> >>>>> request.timeout.ms = 40000
>>>>>> >> >>>>> heartbeat.interval.ms = 3000
>>>>>> >> >>>>> auto.commit.interval.ms = 5000
>>>>>> >> >>>>> receive.buffer.bytes = 65536
>>>>>> >> >>>>> ssl.truststore.type = JKS
>>>>>> >> >>>>> ssl.truststore.location = null
>>>>>> >> >>>>> ssl.keystore.password = null
>>>>>> >> >>>>> fetch.min.bytes = 1
>>>>>> >> >>>>> send.buffer.bytes = 131072
>>>>>> >> >>>>> value.deserializer = class
>>>>>> >> >>>>>
>>>>>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>>>>>> >> >>>>> group.id = storage-group
>>>>>> >> >>>>> retry.backoff.ms = 100
>>>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>>>> >> >>>>> sasl.kerberos.service.name = null
>>>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>>>>>> >> >>>>> ssl.key.password = null
>>>>>> >> >>>>> fetch.max.wait.ms = 500
>>>>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>>>> >> >>>>> connections.max.idle.ms = 540000
>>>>>> >> >>>>> session.timeout.ms = 30000
>>>>>> >> >>>>> metrics.num.samples = 2
>>>>>> >> >>>>> key.deserializer = class
>>>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>>>>>> >> >>>>> ssl.protocol = TLS
>>>>>> >> >>>>> ssl.provider = null
>>>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>>>> >> >>>>> ssl.keystore.location = null
>>>>>> >> >>>>> ssl.cipher.suites = null
>>>>>> >> >>>>> security.protocol = PLAINTEXT
>>>>>> >> >>>>> ssl.keymanager.algorithm = SunX509
>>>>>> >> >>>>> metrics.sample.window.ms = 30000
>>>>>> >> >>>>> auto.offset.reset = earliest
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Example usage with KafkaRDD:*
>>>>>> >> >>>>> val channels = Seq("channel1", "channel2")
>>>>>> >> >>>>>
>>>>>> >> >>>>> channels.toParArray.foreach { channel =>
>>>>>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>>> >> >>>>>
>>>>>> >> >>>>>  // Get offsets for the given topic and the consumer group
>>>>>> >> >>>>> 'storage-group'
>>>>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>>>>> >> >>>>>
>>>>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>>>>> >> >>>>>        kafkaParams asJava,
>>>>>> >> >>>>>        offsetRanges,
>>>>>> >> >>>>>        PreferConsistent).toDS[V]
>>>>>> >> >>>>>
>>>>>> >> >>>>>  // Do some aggregations
>>>>>> >> >>>>>  ds.agg(...)
>>>>>> >> >>>>>  // Save the data
>>>>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>>> >> >>>>>  // Save offsets using a KafkaConsumer
>>>>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>>>>>> >> >>>>>  consumer.close()
>>>>>> >> >>>>> }
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Example usage with Kafka Stream:*
>>>>>> >> >>>>> This creates a stream and processes events in each partition.
>>>>>> >> >>>>> At the
>>>>>> >> >>>>> end
>>>>>> >> >>>>> of
>>>>>> >> >>>>> processing for
>>>>>> >> >>>>> each partition, we updated the offsets for each partition.
>>>>>> >> >>>>> This is
>>>>>> >> >>>>> challenging to do, but is better
>>>>>> >> >>>>> then calling commitAysnc on the stream, because that occurs
>>>>>> >> >>>>> after
>>>>>> >> >>>>> the
>>>>>> >> >>>>> /entire/ RDD has been
>>>>>> >> >>>>> processed. This method minimizes duplicates in an exactly
>>>>>> >> >>>>> once
>>>>>> >> >>>>> environment.
>>>>>> >> >>>>> Since the executors
>>>>>> >> >>>>> use their own custom group "spark-executor-processor-group"
>>>>>> >> >>>>> and the
>>>>>> >> >>>>> commit
>>>>>> >> >>>>> is buried in private
>>>>>> >> >>>>> functions we are unable to use the executors cached consumer
>>>>>> >> >>>>> to
>>>>>> >> >>>>> update
>>>>>> >> >>>>> the
>>>>>> >> >>>>> offsets. This requires us
>>>>>> >> >>>>> to go through multiple steps to update the Kafka offsets
>>>>>> >> >>>>> accordingly.
>>>>>> >> >>>>>
>>>>>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>>>>> >> >>>>>
>>>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>>>>> >> >>>>>      PreferConsistent,
>>>>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>>>>> >> >>>>>        kafkaParams,
>>>>>> >> >>>>>        offsetRanges))
>>>>>> >> >>>>>
>>>>>> >> >>>>> stream.foreachRDD { rdd =>
>>>>>> >> >>>>>    val offsetRanges =
>>>>>> >> >>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>> >> >>>>>
>>>>>> >> >>>>>    // Transform our data
>>>>>> >> >>>>>   rdd.foreachPartition { events =>
>>>>>> >> >>>>>       // Establish a consumer in the executor so we can
>>>>>> >> >>>>> update
>>>>>> >> >>>>> offsets
>>>>>> >> >>>>> after each partition.
>>>>>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer
>>>>>> >> >>>>> to help
>>>>>> >> >>>>> get/set
>>>>>> >> >>>>> offsets
>>>>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>>>>> >> >>>>>       // do something with our data
>>>>>> >> >>>>>
>>>>>> >> >>>>>       // Write the offsets that were updated in this
>>>>>> >> >>>>> partition
>>>>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>>>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>>>>>> >> >>>>> endOffset))
>>>>>> >> >>>>>   }
>>>>>> >> >>>>> }
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> --
>>>>>> >> >>>>> View this message in context:
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>>>>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> >> >>>>> Nabble.com.
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> ---------------------------------------------------------------------
>>>>>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>> >> >>>>>
>>>>>> >> >>>
>>>>>> >> >>>
>>>>>> >> >>
>>>>>> >> >>
>>>>>> >> >> ---------------------------------------------------------------------
>>>>>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>> >> >>
>>>>>> >> >
>>>>>> >
>>>>>> >
>>>>>
>>>>>
>>>>
>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Ivan von Nagy <iv...@vadio.com>.
Hi Sean,

Thanks for responding. We have run our jobs with internal parallel
processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and did
not encounter any of these issues until upgrading to Spark 2.0.1 and Kafka
0.10 clients. If we process serially, then we sometimes get the errors, but
far less often. Also, if done sequentially it takes sometimes more the 2x
as long which is not an option for this particular job.

I posted another example on Nov 10th which is the example below. We
basically iterate through a list in parallel and sometimes the list could
be upwards of a hundred elements. The parallelism in Scala/Spark limits to
about 8 at a time on our nodes. For performance reasons we process in
parallel and we also separate each since each channel has their own topic.
We don't combine all into one KafkaRDD because that means we have to
process all or nothing if an error occurs. This way if a couple of channels
fail, we can re-run the job and it will only process those channels.

This has just been perplexing since we had never encountered any errors for
well over a year using the prior versions. At this time, I am just seeking
any configuration options or code changes that we may be missing or even at
a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that
surfaced these issues.

We continue to use Spark 1.6 with the Kafka 0.8.x clients until this can be
figured out, however, it is a deal breaker for use to upgrade to Spark 2.x
with Kafka 0.10 clients. On a side note, we have not encountered any issues
with the Kafka Producers, this is simply with the KafkaRDD and its use of
CachedKafkaConsumer. Any help is much appreciated.

Thanks,

Ivan

*Example usage with KafkaRDD:*
val channels = Seq("channel1", "channel2")

channels.toParArray.foreach { channel =>
  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)

  // Get offsets for the given topic and the consumer group "$prefix-$topic"

  val offsetRanges = getOffsets(s"$prefix-$topic", channel)

  val ds = KafkaUtils.createRDD[K, V](context,
        kafkaParams asJava,
        offsetRanges,
        PreferConsistent).toDS[V]

  // Do some aggregations
  ds.agg(...)
  // Save the data
  ds.write.mode(SaveMode.Append).parquet(somePath)
  // Save offsets using a KafkaConsumer
  consumer.commitSync(newOffsets.asJava)
  consumer.close()
}

On Sat, Nov 12, 2016 at 11:46 AM, Sean McKibben <gr...@graphex.com> wrote:

> How are you iterating through your RDDs in parallel? In the past (Spark
> 1.5.2) when I've had actions being performed on multiple RDDs concurrently
> using futures, I've encountered some pretty bad behavior in Spark,
> especially during job retries. Very difficult to explain things, like
> records from one RDD leaking into a totally different (though shared
> lineage) RDD during job retries. I'm not sure what documentation exists
> around parallelizing on top of Spark's existing parallelization approach,
> but I would venture a guess that that could be the source of your
> concurrent access problems, and potentially other hidden issues. Have you
> tried a version of your app which doesn't parallelize actions on RDDs, but
> instead serially processes each RDD? I'm sure it isn't ideal
> performance-wise, but it seems like a good choice for an A/B test.
>
> The poll.ms issue could very well be settings or capability of your kafka
> cluster. I think other (non-Spark) approaches may have less consumer churn
> and be less susceptible to things like GC pauses or coordination latency.
> It could also be that the number of consumers being simultaneously created
> on each executor causes a thundering herd problem during initial phases
> (which then causes job retries, which then causes more consumer churn,
> etc.).
>
> Sean
>
>
>
> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <iv...@vadio.com> wrote:
>
> The code was changed to use a unique group for each KafkaRDD that was
> created (see Nov 10 post). There is no KafkaRDD being reused. The basic
> logic (see Nov 10 post for example) is get a list of channels, iterate
> through them in parallel, load a KafkaRDD using a given topic and a
> consumer group that is made from the topic (each RDD uses a different topic
> and group), process the data and write to Parquet files.
>
> Per my Nov 10th post, we still get polling timeouts unless the poll.ms is
> set to something like 10 seconds. We also get concurrent modification
> exceptions as well. I believe the key here is the processing of data in
> parallel is where we encounter issues so we are looking for some possible
> answers surrounding this.
>
> Thanks,
>
> Ivan
>
>
> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> It is already documented that you must use a different group id, which as
>> far as I can tell you are still not doing.
>>
>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <sh...@databricks.com>
>> wrote:
>>
>>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>>>
>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <iv...@vadio.com> wrote:
>>>
>>>> Ok, I have split he KafkaRDD logic to each use their own group and
>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on the
>>>> poll.ms ends up with a timeout and exception so I am still perplexed
>>>> on that one. The new error I am getting now is a
>>>> `ConcurrentModificationException` when Spark is trying to remove the
>>>> CachedKafkaConsumer.
>>>>
>>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>>> for multi-threaded access
>>>> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(Kafk
>>>> aConsumer.java:1431)
>>>> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>>>> onsumer.java:1361)
>>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano
>>>> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>>>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>>>
>>>> Here is the basic logic:
>>>>
>>>> *Using KafkaRDD* - This takes a list of channels and processes them in
>>>> parallel using the KafkaRDD directly. They each use a distinct consumer
>>>> group (s"$prefix-$topic"), and each has it's own topic and each topic
>>>> has 4 partitions. We routinely get timeout errors when polling for data
>>>> when the poll.ms is less then 2 seconds. This occurs whether we
>>>> process in parallel.
>>>>
>>>> *Example usage with KafkaRDD:*
>>>> val channels = Seq("channel1", "channel2")
>>>>
>>>> channels.toParArray.foreach { channel =>
>>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>
>>>>   // Get offsets for the given topic and the consumer group "$prefix-$
>>>> topic"
>>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>>>
>>>>   val ds = KafkaUtils.createRDD[K, V](context,
>>>>         kafkaParams asJava,
>>>>         offsetRanges,
>>>>         PreferConsistent).toDS[V]
>>>>
>>>>   // Do some aggregations
>>>>   ds.agg(...)
>>>>   // Save the data
>>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>   // Save offsets using a KafkaConsumer
>>>>   consumer.commitSync(newOffsets.asJava)
>>>>   consumer.close()
>>>> }
>>>>
>>>> I am not sure why the concurrent issue is there as I have tried to
>>>> debug and also looked at the KafkaConsumer code as well, but everything
>>>> looks like it should not occur. The things to figure out is why when
>>>> running in parallel does this occur and also why the timeouts still occur.
>>>>
>>>> Thanks,
>>>>
>>>> Ivan
>>>>
>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> There definitely is Kafka documentation indicating that you should use
>>>>> a different consumer group for logically different subscribers, this
>>>>> is really basic to Kafka:
>>>>>
>>>>> http://kafka.apache.org/documentation#intro_consumers
>>>>>
>>>>> As for your comment that "commit async after each RDD, which is not
>>>>> really viable also", how is it not viable?  Again, committing offsets
>>>>> to Kafka doesn't give you reliable delivery semantics unless your
>>>>> downstream data store is idempotent.  If your downstream data store is
>>>>> idempotent, then it shouldn't matter to you when offset commits
>>>>> happen, as long as they happen within a reasonable time after the data
>>>>> is written.
>>>>>
>>>>> Do you want to keep arguing with me, or follow my advice and proceed
>>>>> with debugging any remaining issues after you make the changes I
>>>>> suggested?
>>>>>
>>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>>>>> > With our stream version, we update the offsets for only the
>>>>> partition we
>>>>> > operating on. We even break down the partition into smaller batches
>>>>> and then
>>>>> > update the offsets after each batch within the partition. With Spark
>>>>> 1.6 and
>>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
>>>>> not
>>>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>>>> update
>>>>> > the offsets for a given consumer group. You have to subscribe or
>>>>> assign
>>>>> > partitions to even do so.
>>>>> >
>>>>> > As for storing the offsets in some other place like a DB, it don't
>>>>> find this
>>>>> > useful because you then can't use tools like Kafka Manager. In order
>>>>> to do
>>>>> > so you would have to store in a DB and the circle back and update
>>>>> Kafka
>>>>> > afterwards. This means you have to keep two sources in sync which is
>>>>> not
>>>>> > really a good idea.
>>>>> >
>>>>> > It is a challenge in Spark to use the Kafka offsets since the drive
>>>>> keeps
>>>>> > subscribed to the topic(s) and consumer group, while the executors
>>>>> prepend
>>>>> > "spark-executor-" to the consumer group. The stream (driver) does
>>>>> allow you
>>>>> > to commit async after each RDD, which is not really viable also. I
>>>>> have not
>>>>> > of implementing an Akka actor system on the driver and send it
>>>>> messages from
>>>>> > the executor code to update the offsets, but then that is
>>>>> asynchronous as
>>>>> > well so not really a good solution.
>>>>> >
>>>>> > I have no idea why Kafka made this change and also why in the
>>>>> parallel
>>>>> > KafkaRDD application we would be advised to use different consumer
>>>>> groups
>>>>> > for each RDD. That seems strange to me that different consumer
>>>>> groups would
>>>>> > be required or advised. There is no Kafka documentation that I know
>>>>> if that
>>>>> > states this. The biggest issue I see with the parallel KafkaRDD is
>>>>> the
>>>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get
>>>>> the issue.
>>>>> > Something is not right here and just not seem right. As I mentioned
>>>>> with the
>>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
>>>>> this
>>>>> > issue. We have been running the same basic logic for over a year now
>>>>> without
>>>>> > one hitch at all.
>>>>> >
>>>>> > Ivan
>>>>> >
>>>>> >
>>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>> >>
>>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>>>>> >> general, not just kafka) have been progressing on to the next batch
>>>>> >> after a given batch aborts for quite some time now.  Yet another
>>>>> >> reason I put offsets in my database transactionally.  My jobs throw
>>>>> >> exceptions if the offset in the DB isn't what I expected it to be.
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com>
>>>>> wrote:
>>>>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>>>>> using
>>>>> >> > the "Kafka Stream" approach that he is using, except I'm storing
>>>>> my offsets
>>>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>>>>> haven't yet
>>>>> >> > implemented the KafkaRDD approach, and therefore don't have the
>>>>> concurrency
>>>>> >> > issues, but a very similar use case is coming up for me soon,
>>>>> it's just been
>>>>> >> > backburnered until I can get streaming to be more reliable (I will
>>>>> >> > definitely ensure unique group IDs when I do). Offset commits are
>>>>> certainly
>>>>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>>>>> with Spark.
>>>>> >> >
>>>>> >> > While i may be able to alleviate the timeout by just increasing
>>>>> it, I've
>>>>> >> > noticed something else that is more worrying: When one task fails
>>>>> 4 times in
>>>>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>>>>> Spark aborts
>>>>> >> > the Stage and Job with "Job aborted due to stage failure: Task _
>>>>> in stage _
>>>>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>>>>> instead of
>>>>> >> > stopping the Application there (as previous versions of Spark
>>>>> did) the next
>>>>> >> > microbatch marches on and offsets are committed ahead of the
>>>>> failed
>>>>> >> > microbatch. Suddenly my at-least-once app becomes more
>>>>> >> > sometimes-at-least-once which is no good. In order for spark to
>>>>> display that
>>>>> >> > failure, I must be propagating the errors up to Spark, but the
>>>>> behavior of
>>>>> >> > marching forward with the next microbatch seems to be new, and a
>>>>> big
>>>>> >> > potential for data loss in streaming applications.
>>>>> >> >
>>>>> >> > Am I perhaps missing a setting to stop the entire streaming
>>>>> application
>>>>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>>>>> behavior
>>>>> >> > of a streaming application skipping over failed microbatches?
>>>>> >> >
>>>>> >> > Thanks,
>>>>> >> > Sean
>>>>> >> >
>>>>> >> >
>>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>> >> >>
>>>>> >> >> So basically what I am saying is
>>>>> >> >>
>>>>> >> >> - increase poll.ms
>>>>> >> >> - use a separate group id everywhere
>>>>> >> >> - stop committing offsets under the covers
>>>>> >> >>
>>>>> >> >> That should eliminate all of those as possible causes, and then
>>>>> we can
>>>>> >> >> see if there are still issues.
>>>>> >> >>
>>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>>>>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If
>>>>> you
>>>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>>>>> >> >> consumer api should be usable with later brokers.  As long as you
>>>>> >> >> don't need SSL or dynamic subscriptions, and it meets your
>>>>> needs, keep
>>>>> >> >> using it.
>>>>> >> >>
>>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com>
>>>>> wrote:
>>>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but
>>>>> each RDD
>>>>> >> >>> uses a
>>>>> >> >>> single distinct topic. For example, the group would be
>>>>> something like
>>>>> >> >>> "storage-group", and the topics would be "storage-channel1", and
>>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>>>>> >> >>> assigned the
>>>>> >> >>> partitions assigned, and then commit offsets are called after
>>>>> the RDD
>>>>> >> >>> is
>>>>> >> >>> processed. This should not interfere with the consumer group
>>>>> used by
>>>>> >> >>> the
>>>>> >> >>> executors which would be "spark-executor-storage-group".
>>>>> >> >>>
>>>>> >> >>> In the streaming example there is a single topic
>>>>> ("client-events") and
>>>>> >> >>> group
>>>>> >> >>> ("processing-group"). A single stream is created and offsets are
>>>>> >> >>> manually
>>>>> >> >>> updated from the executor after each partition is handled. This
>>>>> was a
>>>>> >> >>> challenge since Spark now requires one to assign or subscribe
>>>>> to a
>>>>> >> >>> topic in
>>>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have
>>>>> to worry
>>>>> >> >>> about
>>>>> >> >>> that. This approach limits your exposure to duplicate data since
>>>>> >> >>> idempotent
>>>>> >> >>> records are not entirely possible in our scenario. At least
>>>>> without a
>>>>> >> >>> lot of
>>>>> >> >>> re-running of logic to de-dup.
>>>>> >> >>>
>>>>> >> >>> Thanks,
>>>>> >> >>>
>>>>> >> >>> Ivan
>>>>> >> >>>
>>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <
>>>>> cody@koeninger.org>
>>>>> >> >>> wrote:
>>>>> >> >>>>
>>>>> >> >>>> So just to be clear, the answers to my questions are
>>>>> >> >>>>
>>>>> >> >>>> - you are not using different group ids, you're using the same
>>>>> group
>>>>> >> >>>> id everywhere
>>>>> >> >>>>
>>>>> >> >>>> - you are committing offsets manually
>>>>> >> >>>>
>>>>> >> >>>> Right?
>>>>> >> >>>>
>>>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
>>>>> source,
>>>>> >> >>>> tune poll.ms upwards even higher.
>>>>> >> >>>>
>>>>> >> >>>> You must use different group ids for different rdds or streams.
>>>>> >> >>>> Kafka consumers won't behave the way you expect if they are
>>>>> all in
>>>>> >> >>>> the
>>>>> >> >>>> same group id, and the consumer cache is keyed by group id.
>>>>> Yes, the
>>>>> >> >>>> executor will tack "spark-executor-" on to the beginning, but
>>>>> if you
>>>>> >> >>>> give it the same base group id, it will be the same.  And the
>>>>> driver
>>>>> >> >>>> will use the group id you gave it, unmodified.
>>>>> >> >>>>
>>>>> >> >>>> Finally, I really can't help you if you're manually writing
>>>>> your own
>>>>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>>>>> >> >>>> duplicates that way doesn't really make sense, your system
>>>>> must be
>>>>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>>>>> store,
>>>>> >> >>>> it can't do transactional exactly once.
>>>>> >> >>>>
>>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com>
>>>>> wrote:
>>>>> >> >>>>> Here are some examples and details of the scenarios. The
>>>>> KafkaRDD is
>>>>> >> >>>>> the
>>>>> >> >>>>> most
>>>>> >> >>>>> error prone to polling
>>>>> >> >>>>> timeouts and concurrentm modification errors.
>>>>> >> >>>>>
>>>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and
>>>>> processes them
>>>>> >> >>>>> in
>>>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>>>>> consumer
>>>>> >> >>>>> group
>>>>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>>>>> has 4
>>>>> >> >>>>> partitions. We routinely get timeout errors when polling for
>>>>> data.
>>>>> >> >>>>> This
>>>>> >> >>>>> occurs whether we process in parallel or sequentially.
>>>>> >> >>>>>
>>>>> >> >>>>> *Spark Kafka setting:*
>>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>>>> >> >>>>>
>>>>> >> >>>>> *Kafka Consumer Params:*
>>>>> >> >>>>> metric.reporters = []
>>>>> >> >>>>> metadata.max.age.ms = 300000
>>>>> >> >>>>> partition.assignment.strategy =
>>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>>> >> >>>>> reconnect.backoff.ms = 50
>>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>>> >> >>>>> max.partition.fetch.bytes = 1048576
>>>>> >> >>>>> bootstrap.servers = [somemachine:31000]
>>>>> >> >>>>> ssl.keystore.type = JKS
>>>>> >> >>>>> enable.auto.commit = false
>>>>> >> >>>>> sasl.mechanism = GSSAPI
>>>>> >> >>>>> interceptor.classes = null
>>>>> >> >>>>> exclude.internal.topics = true
>>>>> >> >>>>> ssl.truststore.password = null
>>>>> >> >>>>> client.id =
>>>>> >> >>>>> ssl.endpoint.identification.algorithm = null
>>>>> >> >>>>> max.poll.records = 1000
>>>>> >> >>>>> check.crcs = true
>>>>> >> >>>>> request.timeout.ms = 40000
>>>>> >> >>>>> heartbeat.interval.ms = 3000
>>>>> >> >>>>> auto.commit.interval.ms = 5000
>>>>> >> >>>>> receive.buffer.bytes = 65536
>>>>> >> >>>>> ssl.truststore.type = JKS
>>>>> >> >>>>> ssl.truststore.location = null
>>>>> >> >>>>> ssl.keystore.password = null
>>>>> >> >>>>> fetch.min.bytes = 1
>>>>> >> >>>>> send.buffer.bytes = 131072
>>>>> >> >>>>> value.deserializer = class
>>>>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser
>>>>> ializer
>>>>> >> >>>>> group.id = storage-group
>>>>> >> >>>>> retry.backoff.ms = 100
>>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>>> >> >>>>> sasl.kerberos.service.name = null
>>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>>>>> >> >>>>> ssl.key.password = null
>>>>> >> >>>>> fetch.max.wait.ms = 500
>>>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>>> >> >>>>> connections.max.idle.ms = 540000
>>>>> >> >>>>> session.timeout.ms = 30000
>>>>> >> >>>>> metrics.num.samples = 2
>>>>> >> >>>>> key.deserializer = class
>>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>>>>> >> >>>>> ssl.protocol = TLS
>>>>> >> >>>>> ssl.provider = null
>>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>>> >> >>>>> ssl.keystore.location = null
>>>>> >> >>>>> ssl.cipher.suites = null
>>>>> >> >>>>> security.protocol = PLAINTEXT
>>>>> >> >>>>> ssl.keymanager.algorithm = SunX509
>>>>> >> >>>>> metrics.sample.window.ms = 30000
>>>>> >> >>>>> auto.offset.reset = earliest
>>>>> >> >>>>>
>>>>> >> >>>>> *Example usage with KafkaRDD:*
>>>>> >> >>>>> val channels = Seq("channel1", "channel2")
>>>>> >> >>>>>
>>>>> >> >>>>> channels.toParArray.foreach { channel =>
>>>>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>> >> >>>>>
>>>>> >> >>>>>  // Get offsets for the given topic and the consumer group
>>>>> >> >>>>> 'storage-group'
>>>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>>>> >> >>>>>
>>>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>>>> >> >>>>>        kafkaParams asJava,
>>>>> >> >>>>>        offsetRanges,
>>>>> >> >>>>>        PreferConsistent).toDS[V]
>>>>> >> >>>>>
>>>>> >> >>>>>  // Do some aggregations
>>>>> >> >>>>>  ds.agg(...)
>>>>> >> >>>>>  // Save the data
>>>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>> >> >>>>>  // Save offsets using a KafkaConsumer
>>>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>>>>> >> >>>>>  consumer.close()
>>>>> >> >>>>> }
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>> *Example usage with Kafka Stream:*
>>>>> >> >>>>> This creates a stream and processes events in each partition.
>>>>> At the
>>>>> >> >>>>> end
>>>>> >> >>>>> of
>>>>> >> >>>>> processing for
>>>>> >> >>>>> each partition, we updated the offsets for each partition.
>>>>> This is
>>>>> >> >>>>> challenging to do, but is better
>>>>> >> >>>>> then calling commitAysnc on the stream, because that occurs
>>>>> after
>>>>> >> >>>>> the
>>>>> >> >>>>> /entire/ RDD has been
>>>>> >> >>>>> processed. This method minimizes duplicates in an exactly once
>>>>> >> >>>>> environment.
>>>>> >> >>>>> Since the executors
>>>>> >> >>>>> use their own custom group "spark-executor-processor-group"
>>>>> and the
>>>>> >> >>>>> commit
>>>>> >> >>>>> is buried in private
>>>>> >> >>>>> functions we are unable to use the executors cached consumer
>>>>> to
>>>>> >> >>>>> update
>>>>> >> >>>>> the
>>>>> >> >>>>> offsets. This requires us
>>>>> >> >>>>> to go through multiple steps to update the Kafka offsets
>>>>> >> >>>>> accordingly.
>>>>> >> >>>>>
>>>>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>>>> >> >>>>>
>>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>>>> >> >>>>>      PreferConsistent,
>>>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>>>> >> >>>>>        kafkaParams,
>>>>> >> >>>>>        offsetRanges))
>>>>> >> >>>>>
>>>>> >> >>>>> stream.foreachRDD { rdd =>
>>>>> >> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRang
>>>>> es].offsetRanges
>>>>> >> >>>>>
>>>>> >> >>>>>    // Transform our data
>>>>> >> >>>>>   rdd.foreachPartition { events =>
>>>>> >> >>>>>       // Establish a consumer in the executor so we can update
>>>>> >> >>>>> offsets
>>>>> >> >>>>> after each partition.
>>>>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer
>>>>> to help
>>>>> >> >>>>> get/set
>>>>> >> >>>>> offsets
>>>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>>>> >> >>>>>       // do something with our data
>>>>> >> >>>>>
>>>>> >> >>>>>       // Write the offsets that were updated in this partition
>>>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>>>>> >> >>>>> endOffset))
>>>>> >> >>>>>   }
>>>>> >> >>>>> }
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>> --
>>>>> >> >>>>> View this message in context:
>>>>> >> >>>>>
>>>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil
>>>>> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>>>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> >> >>>>> Nabble.com <http://nabble.com>.
>>>>> >> >>>>>
>>>>> >> >>>>>
>>>>> >> >>>>> ------------------------------------------------------------
>>>>> ---------
>>>>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>> >> >>>>>
>>>>> >> >>>
>>>>> >> >>>
>>>>> >> >>
>>>>> >> >> ------------------------------------------------------------
>>>>> ---------
>>>>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>> >> >>
>>>>> >> >
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>>
>
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Sean McKibben <gr...@graphex.com>.
How are you iterating through your RDDs in parallel? In the past (Spark 1.5.2) when I've had actions being performed on multiple RDDs concurrently using futures, I've encountered some pretty bad behavior in Spark, especially during job retries. Very difficult to explain things, like records from one RDD leaking into a totally different (though shared lineage) RDD during job retries. I'm not sure what documentation exists around parallelizing on top of Spark's existing parallelization approach, but I would venture a guess that that could be the source of your concurrent access problems, and potentially other hidden issues. Have you tried a version of your app which doesn't parallelize actions on RDDs, but instead serially processes each RDD? I'm sure it isn't ideal performance-wise, but it seems like a good choice for an A/B test.

The poll.ms issue could very well be settings or capability of your kafka cluster. I think other (non-Spark) approaches may have less consumer churn and be less susceptible to things like GC pauses or coordination latency. It could also be that the number of consumers being simultaneously created on each executor causes a thundering herd problem during initial phases (which then causes job retries, which then causes more consumer churn, etc.).

Sean


> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <iv...@vadio.com> wrote:
> 
> The code was changed to use a unique group for each KafkaRDD that was created (see Nov 10 post). There is no KafkaRDD being reused. The basic logic (see Nov 10 post for example) is get a list of channels, iterate through them in parallel, load a KafkaRDD using a given topic and a consumer group that is made from the topic (each RDD uses a different topic and group), process the data and write to Parquet files.
> 
> Per my Nov 10th post, we still get polling timeouts unless the poll.ms <http://poll.ms/> is set to something like 10 seconds. We also get concurrent modification exceptions as well. I believe the key here is the processing of data in parallel is where we encounter issues so we are looking for some possible answers surrounding this.
> 
> Thanks,
> 
> Ivan
> 
> 
> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <cody@koeninger.org <ma...@koeninger.org>> wrote:
> It is already documented that you must use a different group id, which as far as I can tell you are still not doing.
> 
> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixiong@databricks.com <ma...@databricks.com>> wrote:
> Yeah, the KafkaRDD cannot be reused. It's better to document it.
> 
> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <ivan@vadio.com <ma...@vadio.com>> wrote:
> Ok, I have split he KafkaRDD logic to each use their own group and bumped the poll.ms <http://poll.ms/> to 10 seconds. Anything less then 2 seconds on the poll.ms <http://poll.ms/> ends up with a timeout and exception so I am still perplexed on that one. The new error I am getting now is a `ConcurrentModificationException` when Spark is trying to remove the CachedKafkaConsumer.
> 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
> 	at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> 	at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
> 	
> 
> Here is the basic logic: 
> 
> Using KafkaRDD - This takes a list of channels and processes them in parallel using the KafkaRDD directly. They each use a distinct consumer group (s"$prefix-$topic"), and each has it's own topic and each topic has 4 partitions. We routinely get timeout errors when polling for data when the poll.ms <http://poll.ms/> is less then 2 seconds. This occurs whether we process in parallel. 
> 
> Example usage with KafkaRDD:
> val channels = Seq("channel1", "channel2") 
> 
> channels.toParArray.foreach { channel => 
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) 
> 
>   // Get offsets for the given topic and the consumer group "$prefix-$topic" 
>   val offsetRanges = getOffsets(s"$prefix-$topic", channel) 
> 
>   val ds = KafkaUtils.createRDD[K, V](context, 
>         kafkaParams asJava, 
>         offsetRanges, 
>         PreferConsistent).toDS[V] 
> 
>   // Do some aggregations 
>   ds.agg(...) 
>   // Save the data 
>   ds.write.mode(SaveMode.Append).parquet(somePath) 
>   // Save offsets using a KafkaConsumer 
>   consumer.commitSync(newOffsets.asJava) 
>   consumer.close() 
> } 
> 
> I am not sure why the concurrent issue is there as I have tried to debug and also looked at the KafkaConsumer code as well, but everything looks like it should not occur. The things to figure out is why when running in parallel does this occur and also why the timeouts still occur.
> 
> Thanks,
> 
> Ivan
> 
> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <cody@koeninger.org <ma...@koeninger.org>> wrote:
> There definitely is Kafka documentation indicating that you should use
> a different consumer group for logically different subscribers, this
> is really basic to Kafka:
> 
> http://kafka.apache.org/documentation#intro_consumers <http://kafka.apache.org/documentation#intro_consumers>
> 
> As for your comment that "commit async after each RDD, which is not
> really viable also", how is it not viable?  Again, committing offsets
> to Kafka doesn't give you reliable delivery semantics unless your
> downstream data store is idempotent.  If your downstream data store is
> idempotent, then it shouldn't matter to you when offset commits
> happen, as long as they happen within a reasonable time after the data
> is written.
> 
> Do you want to keep arguing with me, or follow my advice and proceed
> with debugging any remaining issues after you make the changes I
> suggested?
> 
> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <ivan@vadio.com <ma...@vadio.com>> wrote:
> > With our stream version, we update the offsets for only the partition we
> > operating on. We even break down the partition into smaller batches and then
> > update the offsets after each batch within the partition. With Spark 1.6 and
> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
> > necessarily a Spark issue since Kafka no longer allows you to simply update
> > the offsets for a given consumer group. You have to subscribe or assign
> > partitions to even do so.
> >
> > As for storing the offsets in some other place like a DB, it don't find this
> > useful because you then can't use tools like Kafka Manager. In order to do
> > so you would have to store in a DB and the circle back and update Kafka
> > afterwards. This means you have to keep two sources in sync which is not
> > really a good idea.
> >
> > It is a challenge in Spark to use the Kafka offsets since the drive keeps
> > subscribed to the topic(s) and consumer group, while the executors prepend
> > "spark-executor-" to the consumer group. The stream (driver) does allow you
> > to commit async after each RDD, which is not really viable also. I have not
> > of implementing an Akka actor system on the driver and send it messages from
> > the executor code to update the offsets, but then that is asynchronous as
> > well so not really a good solution.
> >
> > I have no idea why Kafka made this change and also why in the parallel
> > KafkaRDD application we would be advised to use different consumer groups
> > for each RDD. That seems strange to me that different consumer groups would
> > be required or advised. There is no Kafka documentation that I know if that
> > states this. The biggest issue I see with the parallel KafkaRDD is the
> > timeouts. I have tried to set poll.ms <http://poll.ms/> to 30 seconds and still get the issue.
> > Something is not right here and just not seem right. As I mentioned with the
> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
> > issue. We have been running the same basic logic for over a year now without
> > one hitch at all.
> >
> > Ivan
> >
> >
> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <cody@koeninger.org <ma...@koeninger.org>> wrote:
> >>
> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
> >> general, not just kafka) have been progressing on to the next batch
> >> after a given batch aborts for quite some time now.  Yet another
> >> reason I put offsets in my database transactionally.  My jobs throw
> >> exceptions if the offset in the DB isn't what I expected it to be.
> >>
> >>
> >>
> >>
> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <graphex@graphex.com <ma...@graphex.com>> wrote:
> >> > I've been encountering the same kinds of timeout issues as Ivan, using
> >> > the "Kafka Stream" approach that he is using, except I'm storing my offsets
> >> > manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
> >> > implemented the KafkaRDD approach, and therefore don't have the concurrency
> >> > issues, but a very similar use case is coming up for me soon, it's just been
> >> > backburnered until I can get streaming to be more reliable (I will
> >> > definitely ensure unique group IDs when I do). Offset commits are certainly
> >> > more painful in Kafka 0.10, and that doesn't have anything to do with Spark.
> >> >
> >> > While i may be able to alleviate the timeout by just increasing it, I've
> >> > noticed something else that is more worrying: When one task fails 4 times in
> >> > a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts
> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in stage _
> >> > failed 4 times". That's fine, and it's the behavior I want, but instead of
> >> > stopping the Application there (as previous versions of Spark did) the next
> >> > microbatch marches on and offsets are committed ahead of the failed
> >> > microbatch. Suddenly my at-least-once app becomes more
> >> > sometimes-at-least-once which is no good. In order for spark to display that
> >> > failure, I must be propagating the errors up to Spark, but the behavior of
> >> > marching forward with the next microbatch seems to be new, and a big
> >> > potential for data loss in streaming applications.
> >> >
> >> > Am I perhaps missing a setting to stop the entire streaming application
> >> > once spark.task.maxFailures is reached? Has anyone else seen this behavior
> >> > of a streaming application skipping over failed microbatches?
> >> >
> >> > Thanks,
> >> > Sean
> >> >
> >> >
> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <cody@koeninger.org <ma...@koeninger.org>> wrote:
> >> >>
> >> >> So basically what I am saying is
> >> >>
> >> >> - increase poll.ms <http://poll.ms/>
> >> >> - use a separate group id everywhere
> >> >> - stop committing offsets under the covers
> >> >>
> >> >> That should eliminate all of those as possible causes, and then we can
> >> >> see if there are still issues.
> >> >>
> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> >> >> subscribe to a topic in order to update offsets, Kafka does.  If you
> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
> >> >> consumer api should be usable with later brokers.  As long as you
> >> >> don't need SSL or dynamic subscriptions, and it meets your needs, keep
> >> >> using it.
> >> >>
> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <ivan@vadio.com <ma...@vadio.com>> wrote:
> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD
> >> >>> uses a
> >> >>> single distinct topic. For example, the group would be something like
> >> >>> "storage-group", and the topics would be "storage-channel1", and
> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
> >> >>> assigned the
> >> >>> partitions assigned, and then commit offsets are called after the RDD
> >> >>> is
> >> >>> processed. This should not interfere with the consumer group used by
> >> >>> the
> >> >>> executors which would be "spark-executor-storage-group".
> >> >>>
> >> >>> In the streaming example there is a single topic ("client-events") and
> >> >>> group
> >> >>> ("processing-group"). A single stream is created and offsets are
> >> >>> manually
> >> >>> updated from the executor after each partition is handled. This was a
> >> >>> challenge since Spark now requires one to assign or subscribe to a
> >> >>> topic in
> >> >>> order to even update the offsets. In 0.8.2.x you did not have to worry
> >> >>> about
> >> >>> that. This approach limits your exposure to duplicate data since
> >> >>> idempotent
> >> >>> records are not entirely possible in our scenario. At least without a
> >> >>> lot of
> >> >>> re-running of logic to de-dup.
> >> >>>
> >> >>> Thanks,
> >> >>>
> >> >>> Ivan
> >> >>>
> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <cody@koeninger.org <ma...@koeninger.org>>
> >> >>> wrote:
> >> >>>>
> >> >>>> So just to be clear, the answers to my questions are
> >> >>>>
> >> >>>> - you are not using different group ids, you're using the same group
> >> >>>> id everywhere
> >> >>>>
> >> >>>> - you are committing offsets manually
> >> >>>>
> >> >>>> Right?
> >> >>>>
> >> >>>> If you want to eliminate network or kafka misbehavior as a source,
> >> >>>> tune poll.ms <http://poll.ms/> upwards even higher.
> >> >>>>
> >> >>>> You must use different group ids for different rdds or streams.
> >> >>>> Kafka consumers won't behave the way you expect if they are all in
> >> >>>> the
> >> >>>> same group id, and the consumer cache is keyed by group id. Yes, the
> >> >>>> executor will tack "spark-executor-" on to the beginning, but if you
> >> >>>> give it the same base group id, it will be the same.  And the driver
> >> >>>> will use the group id you gave it, unmodified.
> >> >>>>
> >> >>>> Finally, I really can't help you if you're manually writing your own
> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
> >> >>>> duplicates that way doesn't really make sense, your system must be
> >> >>>> able to handle duplicates if you're using kafka as an offsets store,
> >> >>>> it can't do transactional exactly once.
> >> >>>>
> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <ivan@vadio.com <ma...@vadio.com>> wrote:
> >> >>>>> Here are some examples and details of the scenarios. The KafkaRDD is
> >> >>>>> the
> >> >>>>> most
> >> >>>>> error prone to polling
> >> >>>>> timeouts and concurrentm modification errors.
> >> >>>>>
> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes them
> >> >>>>> in
> >> >>>>> parallel using the KafkaRDD directly. they all use the same consumer
> >> >>>>> group
> >> >>>>> ('storage-group'), but each has it's own topic and each topic has 4
> >> >>>>> partitions. We routinely get timeout errors when polling for data.
> >> >>>>> This
> >> >>>>> occurs whether we process in parallel or sequentially.
> >> >>>>>
> >> >>>>> *Spark Kafka setting:*
> >> >>>>> spark.streaming.kafka.consumer.poll.ms <http://spark.streaming.kafka.consumer.poll.ms/>=2000
> >> >>>>>
> >> >>>>> *Kafka Consumer Params:*
> >> >>>>> metric.reporters = []
> >> >>>>> metadata.max.age.ms <http://metadata.max.age.ms/> = 300000
> >> >>>>> partition.assignment.strategy =
> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
> >> >>>>> reconnect.backoff.ms <http://reconnect.backoff.ms/> = 50
> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
> >> >>>>> max.partition.fetch.bytes = 1048576
> >> >>>>> bootstrap.servers = [somemachine:31000]
> >> >>>>> ssl.keystore.type = JKS
> >> >>>>> enable.auto.commit = false
> >> >>>>> sasl.mechanism = GSSAPI
> >> >>>>> interceptor.classes = null
> >> >>>>> exclude.internal.topics = true
> >> >>>>> ssl.truststore.password = null
> >> >>>>> client.id <http://client.id/> =
> >> >>>>> ssl.endpoint.identification.al <http://ssl.endpoint.identification.al/>gorithm = null
> >> >>>>> max.poll.records = 1000
> >> >>>>> check.crcs = true
> >> >>>>> request.timeout.ms <http://request.timeout.ms/> = 40000
> >> >>>>> heartbeat.interval.ms <http://heartbeat.interval.ms/> = 3000
> >> >>>>> auto.commit.interval.ms <http://auto.commit.interval.ms/> = 5000
> >> >>>>> receive.buffer.bytes = 65536
> >> >>>>> ssl.truststore.type = JKS
> >> >>>>> ssl.truststore.location = null
> >> >>>>> ssl.keystore.password = null
> >> >>>>> fetch.min.bytes = 1
> >> >>>>> send.buffer.bytes = 131072
> >> >>>>> value.deserializer = class
> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> >> >>>>> group.id <http://group.id/> = storage-group
> >> >>>>> retry.backoff.ms <http://retry.backoff.ms/> = 100
> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >> >>>>> sasl.kerberos.service.name <http://sasl.kerberos.service.name/> = null
> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
> >> >>>>> ssl.trustmanager.algorithm = PKIX
> >> >>>>> ssl.key.password = null
> >> >>>>> fetch.max.wait.ms <http://fetch.max.wait.ms/> = 500
> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
> >> >>>>> connections.max.idle.ms <http://connections.max.idle.ms/> = 540000
> >> >>>>> session.timeout.ms <http://session.timeout.ms/> = 30000
> >> >>>>> metrics.num.samples = 2
> >> >>>>> key.deserializer = class
> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
> >> >>>>> ssl.protocol = TLS
> >> >>>>> ssl.provider = null
> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >> >>>>> ssl.keystore.location = null
> >> >>>>> ssl.cipher.suites = null
> >> >>>>> security.protocol = PLAINTEXT
> >> >>>>> ssl.keymanager.algorithm = SunX509
> >> >>>>> metrics.sample.window.ms <http://metrics.sample.window.ms/> = 30000
> >> >>>>> auto.offset.reset = earliest
> >> >>>>>
> >> >>>>> *Example usage with KafkaRDD:*
> >> >>>>> val channels = Seq("channel1", "channel2")
> >> >>>>>
> >> >>>>> channels.toParArray.foreach { channel =>
> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >> >>>>>
> >> >>>>>  // Get offsets for the given topic and the consumer group
> >> >>>>> 'storage-group'
> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
> >> >>>>>
> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
> >> >>>>>        kafkaParams asJava,
> >> >>>>>        offsetRanges,
> >> >>>>>        PreferConsistent).toDS[V]
> >> >>>>>
> >> >>>>>  // Do some aggregations
> >> >>>>>  ds.agg(...)
> >> >>>>>  // Save the data
> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
> >> >>>>>  // Save offsets using a KafkaConsumer
> >> >>>>>  consumer.commitSync(newOffsets.asJava)
> >> >>>>>  consumer.close()
> >> >>>>> }
> >> >>>>>
> >> >>>>>
> >> >>>>> *Example usage with Kafka Stream:*
> >> >>>>> This creates a stream and processes events in each partition. At the
> >> >>>>> end
> >> >>>>> of
> >> >>>>> processing for
> >> >>>>> each partition, we updated the offsets for each partition. This is
> >> >>>>> challenging to do, but is better
> >> >>>>> then calling commitAysnc on the stream, because that occurs after
> >> >>>>> the
> >> >>>>> /entire/ RDD has been
> >> >>>>> processed. This method minimizes duplicates in an exactly once
> >> >>>>> environment.
> >> >>>>> Since the executors
> >> >>>>> use their own custom group "spark-executor-processor-group" and the
> >> >>>>> commit
> >> >>>>> is buried in private
> >> >>>>> functions we are unable to use the executors cached consumer to
> >> >>>>> update
> >> >>>>> the
> >> >>>>> offsets. This requires us
> >> >>>>> to go through multiple steps to update the Kafka offsets
> >> >>>>> accordingly.
> >> >>>>>
> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
> >> >>>>>
> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
> >> >>>>>      PreferConsistent,
> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
> >> >>>>>        kafkaParams,
> >> >>>>>        offsetRanges))
> >> >>>>>
> >> >>>>> stream.foreachRDD { rdd =>
> >> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >> >>>>>
> >> >>>>>    // Transform our data
> >> >>>>>   rdd.foreachPartition { events =>
> >> >>>>>       // Establish a consumer in the executor so we can update
> >> >>>>> offsets
> >> >>>>> after each partition.
> >> >>>>>       // This class is homegrown and uses the KafkaConsumer to help
> >> >>>>> get/set
> >> >>>>> offsets
> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
> >> >>>>>       // do something with our data
> >> >>>>>
> >> >>>>>       // Write the offsets that were updated in this partition
> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
> >> >>>>> endOffset))
> >> >>>>>   }
> >> >>>>> }
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> --
> >> >>>>> View this message in context:
> >> >>>>>
> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html <http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html>
> >> >>>>> Sent from the Apache Spark User List mailing list archive at
> >> >>>>> Nabble.com.
> >> >>>>>
> >> >>>>>
> >> >>>>> ---------------------------------------------------------------------
> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
> >> >>>>>
> >> >>>
> >> >>>
> >> >>
> >> >> ---------------------------------------------------------------------
> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
> >> >>
> >> >
> >
> >
> 
> 
> 


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Ivan von Nagy <iv...@vadio.com>.
The code was changed to use a unique group for each KafkaRDD that was
created (see Nov 10 post). There is no KafkaRDD being reused. The basic
logic (see Nov 10 post for example) is get a list of channels, iterate
through them in parallel, load a KafkaRDD using a given topic and a
consumer group that is made from the topic (each RDD uses a different topic
and group), process the data and write to Parquet files.

Per my Nov 10th post, we still get polling timeouts unless the poll.ms is
set to something like 10 seconds. We also get concurrent modification
exceptions as well. I believe the key here is the processing of data in
parallel is where we encounter issues so we are looking for some possible
answers surrounding this.

Thanks,

Ivan


On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <co...@koeninger.org> wrote:

> It is already documented that you must use a different group id, which as
> far as I can tell you are still not doing.
>
> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <sh...@databricks.com>
> wrote:
>
>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>>
>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <iv...@vadio.com> wrote:
>>
>>> Ok, I have split he KafkaRDD logic to each use their own group and
>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on the
>>> poll.ms ends up with a timeout and exception so I am still perplexed on
>>> that one. The new error I am getting now is a `ConcurrentModificationException`
>>> when Spark is trying to remove the CachedKafkaConsumer.
>>>
>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>> for multi-threaded access
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(Kafk
>>> aConsumer.java:1431)
>>> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>>> onsumer.java:1361)
>>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano
>>> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>>
>>> Here is the basic logic:
>>>
>>> *Using KafkaRDD* - This takes a list of channels and processes them in
>>> parallel using the KafkaRDD directly. They each use a distinct consumer
>>> group (s"$prefix-$topic"), and each has it's own topic and each topic
>>> has 4 partitions. We routinely get timeout errors when polling for data
>>> when the poll.ms is less then 2 seconds. This occurs whether we process
>>> in parallel.
>>>
>>> *Example usage with KafkaRDD:*
>>> val channels = Seq("channel1", "channel2")
>>>
>>> channels.toParArray.foreach { channel =>
>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>
>>>   // Get offsets for the given topic and the consumer group "$prefix-$
>>> topic"
>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>>
>>>   val ds = KafkaUtils.createRDD[K, V](context,
>>>         kafkaParams asJava,
>>>         offsetRanges,
>>>         PreferConsistent).toDS[V]
>>>
>>>   // Do some aggregations
>>>   ds.agg(...)
>>>   // Save the data
>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>>   // Save offsets using a KafkaConsumer
>>>   consumer.commitSync(newOffsets.asJava)
>>>   consumer.close()
>>> }
>>>
>>> I am not sure why the concurrent issue is there as I have tried to debug
>>> and also looked at the KafkaConsumer code as well, but everything looks
>>> like it should not occur. The things to figure out is why when running in
>>> parallel does this occur and also why the timeouts still occur.
>>>
>>> Thanks,
>>>
>>> Ivan
>>>
>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> There definitely is Kafka documentation indicating that you should use
>>>> a different consumer group for logically different subscribers, this
>>>> is really basic to Kafka:
>>>>
>>>> http://kafka.apache.org/documentation#intro_consumers
>>>>
>>>> As for your comment that "commit async after each RDD, which is not
>>>> really viable also", how is it not viable?  Again, committing offsets
>>>> to Kafka doesn't give you reliable delivery semantics unless your
>>>> downstream data store is idempotent.  If your downstream data store is
>>>> idempotent, then it shouldn't matter to you when offset commits
>>>> happen, as long as they happen within a reasonable time after the data
>>>> is written.
>>>>
>>>> Do you want to keep arguing with me, or follow my advice and proceed
>>>> with debugging any remaining issues after you make the changes I
>>>> suggested?
>>>>
>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>>>> > With our stream version, we update the offsets for only the partition
>>>> we
>>>> > operating on. We even break down the partition into smaller batches
>>>> and then
>>>> > update the offsets after each batch within the partition. With Spark
>>>> 1.6 and
>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
>>>> not
>>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>>> update
>>>> > the offsets for a given consumer group. You have to subscribe or
>>>> assign
>>>> > partitions to even do so.
>>>> >
>>>> > As for storing the offsets in some other place like a DB, it don't
>>>> find this
>>>> > useful because you then can't use tools like Kafka Manager. In order
>>>> to do
>>>> > so you would have to store in a DB and the circle back and update
>>>> Kafka
>>>> > afterwards. This means you have to keep two sources in sync which is
>>>> not
>>>> > really a good idea.
>>>> >
>>>> > It is a challenge in Spark to use the Kafka offsets since the drive
>>>> keeps
>>>> > subscribed to the topic(s) and consumer group, while the executors
>>>> prepend
>>>> > "spark-executor-" to the consumer group. The stream (driver) does
>>>> allow you
>>>> > to commit async after each RDD, which is not really viable also. I
>>>> have not
>>>> > of implementing an Akka actor system on the driver and send it
>>>> messages from
>>>> > the executor code to update the offsets, but then that is
>>>> asynchronous as
>>>> > well so not really a good solution.
>>>> >
>>>> > I have no idea why Kafka made this change and also why in the parallel
>>>> > KafkaRDD application we would be advised to use different consumer
>>>> groups
>>>> > for each RDD. That seems strange to me that different consumer groups
>>>> would
>>>> > be required or advised. There is no Kafka documentation that I know
>>>> if that
>>>> > states this. The biggest issue I see with the parallel KafkaRDD is the
>>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get
>>>> the issue.
>>>> > Something is not right here and just not seem right. As I mentioned
>>>> with the
>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
>>>> this
>>>> > issue. We have been running the same basic logic for over a year now
>>>> without
>>>> > one hitch at all.
>>>> >
>>>> > Ivan
>>>> >
>>>> >
>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>> >>
>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>>>> >> general, not just kafka) have been progressing on to the next batch
>>>> >> after a given batch aborts for quite some time now.  Yet another
>>>> >> reason I put offsets in my database transactionally.  My jobs throw
>>>> >> exceptions if the offset in the DB isn't what I expected it to be.
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com>
>>>> wrote:
>>>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>>>> using
>>>> >> > the "Kafka Stream" approach that he is using, except I'm storing
>>>> my offsets
>>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>>>> haven't yet
>>>> >> > implemented the KafkaRDD approach, and therefore don't have the
>>>> concurrency
>>>> >> > issues, but a very similar use case is coming up for me soon, it's
>>>> just been
>>>> >> > backburnered until I can get streaming to be more reliable (I will
>>>> >> > definitely ensure unique group IDs when I do). Offset commits are
>>>> certainly
>>>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>>>> with Spark.
>>>> >> >
>>>> >> > While i may be able to alleviate the timeout by just increasing
>>>> it, I've
>>>> >> > noticed something else that is more worrying: When one task fails
>>>> 4 times in
>>>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>>>> Spark aborts
>>>> >> > the Stage and Job with "Job aborted due to stage failure: Task _
>>>> in stage _
>>>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>>>> instead of
>>>> >> > stopping the Application there (as previous versions of Spark did)
>>>> the next
>>>> >> > microbatch marches on and offsets are committed ahead of the failed
>>>> >> > microbatch. Suddenly my at-least-once app becomes more
>>>> >> > sometimes-at-least-once which is no good. In order for spark to
>>>> display that
>>>> >> > failure, I must be propagating the errors up to Spark, but the
>>>> behavior of
>>>> >> > marching forward with the next microbatch seems to be new, and a
>>>> big
>>>> >> > potential for data loss in streaming applications.
>>>> >> >
>>>> >> > Am I perhaps missing a setting to stop the entire streaming
>>>> application
>>>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>>>> behavior
>>>> >> > of a streaming application skipping over failed microbatches?
>>>> >> >
>>>> >> > Thanks,
>>>> >> > Sean
>>>> >> >
>>>> >> >
>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>> >> >>
>>>> >> >> So basically what I am saying is
>>>> >> >>
>>>> >> >> - increase poll.ms
>>>> >> >> - use a separate group id everywhere
>>>> >> >> - stop committing offsets under the covers
>>>> >> >>
>>>> >> >> That should eliminate all of those as possible causes, and then
>>>> we can
>>>> >> >> see if there are still issues.
>>>> >> >>
>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>>>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If
>>>> you
>>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>>>> >> >> consumer api should be usable with later brokers.  As long as you
>>>> >> >> don't need SSL or dynamic subscriptions, and it meets your needs,
>>>> keep
>>>> >> >> using it.
>>>> >> >>
>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com>
>>>> wrote:
>>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but
>>>> each RDD
>>>> >> >>> uses a
>>>> >> >>> single distinct topic. For example, the group would be something
>>>> like
>>>> >> >>> "storage-group", and the topics would be "storage-channel1", and
>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>>>> >> >>> assigned the
>>>> >> >>> partitions assigned, and then commit offsets are called after
>>>> the RDD
>>>> >> >>> is
>>>> >> >>> processed. This should not interfere with the consumer group
>>>> used by
>>>> >> >>> the
>>>> >> >>> executors which would be "spark-executor-storage-group".
>>>> >> >>>
>>>> >> >>> In the streaming example there is a single topic
>>>> ("client-events") and
>>>> >> >>> group
>>>> >> >>> ("processing-group"). A single stream is created and offsets are
>>>> >> >>> manually
>>>> >> >>> updated from the executor after each partition is handled. This
>>>> was a
>>>> >> >>> challenge since Spark now requires one to assign or subscribe to
>>>> a
>>>> >> >>> topic in
>>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have to
>>>> worry
>>>> >> >>> about
>>>> >> >>> that. This approach limits your exposure to duplicate data since
>>>> >> >>> idempotent
>>>> >> >>> records are not entirely possible in our scenario. At least
>>>> without a
>>>> >> >>> lot of
>>>> >> >>> re-running of logic to de-dup.
>>>> >> >>>
>>>> >> >>> Thanks,
>>>> >> >>>
>>>> >> >>> Ivan
>>>> >> >>>
>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <
>>>> cody@koeninger.org>
>>>> >> >>> wrote:
>>>> >> >>>>
>>>> >> >>>> So just to be clear, the answers to my questions are
>>>> >> >>>>
>>>> >> >>>> - you are not using different group ids, you're using the same
>>>> group
>>>> >> >>>> id everywhere
>>>> >> >>>>
>>>> >> >>>> - you are committing offsets manually
>>>> >> >>>>
>>>> >> >>>> Right?
>>>> >> >>>>
>>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
>>>> source,
>>>> >> >>>> tune poll.ms upwards even higher.
>>>> >> >>>>
>>>> >> >>>> You must use different group ids for different rdds or streams.
>>>> >> >>>> Kafka consumers won't behave the way you expect if they are all
>>>> in
>>>> >> >>>> the
>>>> >> >>>> same group id, and the consumer cache is keyed by group id.
>>>> Yes, the
>>>> >> >>>> executor will tack "spark-executor-" on to the beginning, but
>>>> if you
>>>> >> >>>> give it the same base group id, it will be the same.  And the
>>>> driver
>>>> >> >>>> will use the group id you gave it, unmodified.
>>>> >> >>>>
>>>> >> >>>> Finally, I really can't help you if you're manually writing
>>>> your own
>>>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>>>> >> >>>> duplicates that way doesn't really make sense, your system must
>>>> be
>>>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>>>> store,
>>>> >> >>>> it can't do transactional exactly once.
>>>> >> >>>>
>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
>>>> >> >>>>> Here are some examples and details of the scenarios. The
>>>> KafkaRDD is
>>>> >> >>>>> the
>>>> >> >>>>> most
>>>> >> >>>>> error prone to polling
>>>> >> >>>>> timeouts and concurrentm modification errors.
>>>> >> >>>>>
>>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes
>>>> them
>>>> >> >>>>> in
>>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>>>> consumer
>>>> >> >>>>> group
>>>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>>>> has 4
>>>> >> >>>>> partitions. We routinely get timeout errors when polling for
>>>> data.
>>>> >> >>>>> This
>>>> >> >>>>> occurs whether we process in parallel or sequentially.
>>>> >> >>>>>
>>>> >> >>>>> *Spark Kafka setting:*
>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>>> >> >>>>>
>>>> >> >>>>> *Kafka Consumer Params:*
>>>> >> >>>>> metric.reporters = []
>>>> >> >>>>> metadata.max.age.ms = 300000
>>>> >> >>>>> partition.assignment.strategy =
>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>> >> >>>>> reconnect.backoff.ms = 50
>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> >> >>>>> max.partition.fetch.bytes = 1048576
>>>> >> >>>>> bootstrap.servers = [somemachine:31000]
>>>> >> >>>>> ssl.keystore.type = JKS
>>>> >> >>>>> enable.auto.commit = false
>>>> >> >>>>> sasl.mechanism = GSSAPI
>>>> >> >>>>> interceptor.classes = null
>>>> >> >>>>> exclude.internal.topics = true
>>>> >> >>>>> ssl.truststore.password = null
>>>> >> >>>>> client.id =
>>>> >> >>>>> ssl.endpoint.identification.algorithm = null
>>>> >> >>>>> max.poll.records = 1000
>>>> >> >>>>> check.crcs = true
>>>> >> >>>>> request.timeout.ms = 40000
>>>> >> >>>>> heartbeat.interval.ms = 3000
>>>> >> >>>>> auto.commit.interval.ms = 5000
>>>> >> >>>>> receive.buffer.bytes = 65536
>>>> >> >>>>> ssl.truststore.type = JKS
>>>> >> >>>>> ssl.truststore.location = null
>>>> >> >>>>> ssl.keystore.password = null
>>>> >> >>>>> fetch.min.bytes = 1
>>>> >> >>>>> send.buffer.bytes = 131072
>>>> >> >>>>> value.deserializer = class
>>>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser
>>>> ializer
>>>> >> >>>>> group.id = storage-group
>>>> >> >>>>> retry.backoff.ms = 100
>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> >> >>>>> sasl.kerberos.service.name = null
>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>>>> >> >>>>> ssl.key.password = null
>>>> >> >>>>> fetch.max.wait.ms = 500
>>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> >> >>>>> connections.max.idle.ms = 540000
>>>> >> >>>>> session.timeout.ms = 30000
>>>> >> >>>>> metrics.num.samples = 2
>>>> >> >>>>> key.deserializer = class
>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>>>> >> >>>>> ssl.protocol = TLS
>>>> >> >>>>> ssl.provider = null
>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> >> >>>>> ssl.keystore.location = null
>>>> >> >>>>> ssl.cipher.suites = null
>>>> >> >>>>> security.protocol = PLAINTEXT
>>>> >> >>>>> ssl.keymanager.algorithm = SunX509
>>>> >> >>>>> metrics.sample.window.ms = 30000
>>>> >> >>>>> auto.offset.reset = earliest
>>>> >> >>>>>
>>>> >> >>>>> *Example usage with KafkaRDD:*
>>>> >> >>>>> val channels = Seq("channel1", "channel2")
>>>> >> >>>>>
>>>> >> >>>>> channels.toParArray.foreach { channel =>
>>>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>> >> >>>>>
>>>> >> >>>>>  // Get offsets for the given topic and the consumer group
>>>> >> >>>>> 'storage-group'
>>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>>> >> >>>>>
>>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>>> >> >>>>>        kafkaParams asJava,
>>>> >> >>>>>        offsetRanges,
>>>> >> >>>>>        PreferConsistent).toDS[V]
>>>> >> >>>>>
>>>> >> >>>>>  // Do some aggregations
>>>> >> >>>>>  ds.agg(...)
>>>> >> >>>>>  // Save the data
>>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>>> >> >>>>>  // Save offsets using a KafkaConsumer
>>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>>>> >> >>>>>  consumer.close()
>>>> >> >>>>> }
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>> *Example usage with Kafka Stream:*
>>>> >> >>>>> This creates a stream and processes events in each partition.
>>>> At the
>>>> >> >>>>> end
>>>> >> >>>>> of
>>>> >> >>>>> processing for
>>>> >> >>>>> each partition, we updated the offsets for each partition.
>>>> This is
>>>> >> >>>>> challenging to do, but is better
>>>> >> >>>>> then calling commitAysnc on the stream, because that occurs
>>>> after
>>>> >> >>>>> the
>>>> >> >>>>> /entire/ RDD has been
>>>> >> >>>>> processed. This method minimizes duplicates in an exactly once
>>>> >> >>>>> environment.
>>>> >> >>>>> Since the executors
>>>> >> >>>>> use their own custom group "spark-executor-processor-group"
>>>> and the
>>>> >> >>>>> commit
>>>> >> >>>>> is buried in private
>>>> >> >>>>> functions we are unable to use the executors cached consumer to
>>>> >> >>>>> update
>>>> >> >>>>> the
>>>> >> >>>>> offsets. This requires us
>>>> >> >>>>> to go through multiple steps to update the Kafka offsets
>>>> >> >>>>> accordingly.
>>>> >> >>>>>
>>>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>>> >> >>>>>
>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>>> >> >>>>>      PreferConsistent,
>>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>>> >> >>>>>        kafkaParams,
>>>> >> >>>>>        offsetRanges))
>>>> >> >>>>>
>>>> >> >>>>> stream.foreachRDD { rdd =>
>>>> >> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRang
>>>> es].offsetRanges
>>>> >> >>>>>
>>>> >> >>>>>    // Transform our data
>>>> >> >>>>>   rdd.foreachPartition { events =>
>>>> >> >>>>>       // Establish a consumer in the executor so we can update
>>>> >> >>>>> offsets
>>>> >> >>>>> after each partition.
>>>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer to
>>>> help
>>>> >> >>>>> get/set
>>>> >> >>>>> offsets
>>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>>> >> >>>>>       // do something with our data
>>>> >> >>>>>
>>>> >> >>>>>       // Write the offsets that were updated in this partition
>>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>>>> >> >>>>> endOffset))
>>>> >> >>>>>   }
>>>> >> >>>>> }
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>> --
>>>> >> >>>>> View this message in context:
>>>> >> >>>>>
>>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil
>>>> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>>>> >> >>>>> Nabble.com.
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>> ------------------------------------------------------------
>>>> ---------
>>>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>> >> >>>>>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>
>>>> >> >> ------------------------------------------------------------
>>>> ---------
>>>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>> >> >>
>>>> >> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
It is already documented that you must use a different group id, which as
far as I can tell you are still not doing.

On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <sh...@databricks.com>
wrote:

> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>
> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <iv...@vadio.com> wrote:
>
>> Ok, I have split he KafkaRDD logic to each use their own group and bumped
>> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
>> ends up with a timeout and exception so I am still perplexed on that one.
>> The new error I am getting now is a `ConcurrentModificationException`
>> when Spark is trying to remove the CachedKafkaConsumer.
>>
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>> onsumer.java:1361)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano
>> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>
>> Here is the basic logic:
>>
>> *Using KafkaRDD* - This takes a list of channels and processes them in
>> parallel using the KafkaRDD directly. They each use a distinct consumer
>> group (s"$prefix-$topic"), and each has it's own topic and each topic
>> has 4 partitions. We routinely get timeout errors when polling for data
>> when the poll.ms is less then 2 seconds. This occurs whether we process
>> in parallel.
>>
>> *Example usage with KafkaRDD:*
>> val channels = Seq("channel1", "channel2")
>>
>> channels.toParArray.foreach { channel =>
>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>
>>   // Get offsets for the given topic and the consumer group "$prefix-$
>> topic"
>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>
>>   val ds = KafkaUtils.createRDD[K, V](context,
>>         kafkaParams asJava,
>>         offsetRanges,
>>         PreferConsistent).toDS[V]
>>
>>   // Do some aggregations
>>   ds.agg(...)
>>   // Save the data
>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>   // Save offsets using a KafkaConsumer
>>   consumer.commitSync(newOffsets.asJava)
>>   consumer.close()
>> }
>>
>> I am not sure why the concurrent issue is there as I have tried to debug
>> and also looked at the KafkaConsumer code as well, but everything looks
>> like it should not occur. The things to figure out is why when running in
>> parallel does this occur and also why the timeouts still occur.
>>
>> Thanks,
>>
>> Ivan
>>
>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> There definitely is Kafka documentation indicating that you should use
>>> a different consumer group for logically different subscribers, this
>>> is really basic to Kafka:
>>>
>>> http://kafka.apache.org/documentation#intro_consumers
>>>
>>> As for your comment that "commit async after each RDD, which is not
>>> really viable also", how is it not viable?  Again, committing offsets
>>> to Kafka doesn't give you reliable delivery semantics unless your
>>> downstream data store is idempotent.  If your downstream data store is
>>> idempotent, then it shouldn't matter to you when offset commits
>>> happen, as long as they happen within a reasonable time after the data
>>> is written.
>>>
>>> Do you want to keep arguing with me, or follow my advice and proceed
>>> with debugging any remaining issues after you make the changes I
>>> suggested?
>>>
>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>>> > With our stream version, we update the offsets for only the partition
>>> we
>>> > operating on. We even break down the partition into smaller batches
>>> and then
>>> > update the offsets after each batch within the partition. With Spark
>>> 1.6 and
>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>> update
>>> > the offsets for a given consumer group. You have to subscribe or assign
>>> > partitions to even do so.
>>> >
>>> > As for storing the offsets in some other place like a DB, it don't
>>> find this
>>> > useful because you then can't use tools like Kafka Manager. In order
>>> to do
>>> > so you would have to store in a DB and the circle back and update Kafka
>>> > afterwards. This means you have to keep two sources in sync which is
>>> not
>>> > really a good idea.
>>> >
>>> > It is a challenge in Spark to use the Kafka offsets since the drive
>>> keeps
>>> > subscribed to the topic(s) and consumer group, while the executors
>>> prepend
>>> > "spark-executor-" to the consumer group. The stream (driver) does
>>> allow you
>>> > to commit async after each RDD, which is not really viable also. I
>>> have not
>>> > of implementing an Akka actor system on the driver and send it
>>> messages from
>>> > the executor code to update the offsets, but then that is asynchronous
>>> as
>>> > well so not really a good solution.
>>> >
>>> > I have no idea why Kafka made this change and also why in the parallel
>>> > KafkaRDD application we would be advised to use different consumer
>>> groups
>>> > for each RDD. That seems strange to me that different consumer groups
>>> would
>>> > be required or advised. There is no Kafka documentation that I know if
>>> that
>>> > states this. The biggest issue I see with the parallel KafkaRDD is the
>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get the
>>> issue.
>>> > Something is not right here and just not seem right. As I mentioned
>>> with the
>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
>>> > issue. We have been running the same basic logic for over a year now
>>> without
>>> > one hitch at all.
>>> >
>>> > Ivan
>>> >
>>> >
>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>> >>
>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>>> >> general, not just kafka) have been progressing on to the next batch
>>> >> after a given batch aborts for quite some time now.  Yet another
>>> >> reason I put offsets in my database transactionally.  My jobs throw
>>> >> exceptions if the offset in the DB isn't what I expected it to be.
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com>
>>> wrote:
>>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>>> using
>>> >> > the "Kafka Stream" approach that he is using, except I'm storing my
>>> offsets
>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>>> haven't yet
>>> >> > implemented the KafkaRDD approach, and therefore don't have the
>>> concurrency
>>> >> > issues, but a very similar use case is coming up for me soon, it's
>>> just been
>>> >> > backburnered until I can get streaming to be more reliable (I will
>>> >> > definitely ensure unique group IDs when I do). Offset commits are
>>> certainly
>>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>>> with Spark.
>>> >> >
>>> >> > While i may be able to alleviate the timeout by just increasing it,
>>> I've
>>> >> > noticed something else that is more worrying: When one task fails 4
>>> times in
>>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>>> Spark aborts
>>> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in
>>> stage _
>>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>>> instead of
>>> >> > stopping the Application there (as previous versions of Spark did)
>>> the next
>>> >> > microbatch marches on and offsets are committed ahead of the failed
>>> >> > microbatch. Suddenly my at-least-once app becomes more
>>> >> > sometimes-at-least-once which is no good. In order for spark to
>>> display that
>>> >> > failure, I must be propagating the errors up to Spark, but the
>>> behavior of
>>> >> > marching forward with the next microbatch seems to be new, and a big
>>> >> > potential for data loss in streaming applications.
>>> >> >
>>> >> > Am I perhaps missing a setting to stop the entire streaming
>>> application
>>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>>> behavior
>>> >> > of a streaming application skipping over failed microbatches?
>>> >> >
>>> >> > Thanks,
>>> >> > Sean
>>> >> >
>>> >> >
>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>> >> >>
>>> >> >> So basically what I am saying is
>>> >> >>
>>> >> >> - increase poll.ms
>>> >> >> - use a separate group id everywhere
>>> >> >> - stop committing offsets under the covers
>>> >> >>
>>> >> >> That should eliminate all of those as possible causes, and then we
>>> can
>>> >> >> see if there are still issues.
>>> >> >>
>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If
>>> you
>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>>> >> >> consumer api should be usable with later brokers.  As long as you
>>> >> >> don't need SSL or dynamic subscriptions, and it meets your needs,
>>> keep
>>> >> >> using it.
>>> >> >>
>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com>
>>> wrote:
>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each
>>> RDD
>>> >> >>> uses a
>>> >> >>> single distinct topic. For example, the group would be something
>>> like
>>> >> >>> "storage-group", and the topics would be "storage-channel1", and
>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>>> >> >>> assigned the
>>> >> >>> partitions assigned, and then commit offsets are called after the
>>> RDD
>>> >> >>> is
>>> >> >>> processed. This should not interfere with the consumer group used
>>> by
>>> >> >>> the
>>> >> >>> executors which would be "spark-executor-storage-group".
>>> >> >>>
>>> >> >>> In the streaming example there is a single topic
>>> ("client-events") and
>>> >> >>> group
>>> >> >>> ("processing-group"). A single stream is created and offsets are
>>> >> >>> manually
>>> >> >>> updated from the executor after each partition is handled. This
>>> was a
>>> >> >>> challenge since Spark now requires one to assign or subscribe to a
>>> >> >>> topic in
>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have to
>>> worry
>>> >> >>> about
>>> >> >>> that. This approach limits your exposure to duplicate data since
>>> >> >>> idempotent
>>> >> >>> records are not entirely possible in our scenario. At least
>>> without a
>>> >> >>> lot of
>>> >> >>> re-running of logic to de-dup.
>>> >> >>>
>>> >> >>> Thanks,
>>> >> >>>
>>> >> >>> Ivan
>>> >> >>>
>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <
>>> cody@koeninger.org>
>>> >> >>> wrote:
>>> >> >>>>
>>> >> >>>> So just to be clear, the answers to my questions are
>>> >> >>>>
>>> >> >>>> - you are not using different group ids, you're using the same
>>> group
>>> >> >>>> id everywhere
>>> >> >>>>
>>> >> >>>> - you are committing offsets manually
>>> >> >>>>
>>> >> >>>> Right?
>>> >> >>>>
>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
>>> source,
>>> >> >>>> tune poll.ms upwards even higher.
>>> >> >>>>
>>> >> >>>> You must use different group ids for different rdds or streams.
>>> >> >>>> Kafka consumers won't behave the way you expect if they are all
>>> in
>>> >> >>>> the
>>> >> >>>> same group id, and the consumer cache is keyed by group id. Yes,
>>> the
>>> >> >>>> executor will tack "spark-executor-" on to the beginning, but if
>>> you
>>> >> >>>> give it the same base group id, it will be the same.  And the
>>> driver
>>> >> >>>> will use the group id you gave it, unmodified.
>>> >> >>>>
>>> >> >>>> Finally, I really can't help you if you're manually writing your
>>> own
>>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>>> >> >>>> duplicates that way doesn't really make sense, your system must
>>> be
>>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>>> store,
>>> >> >>>> it can't do transactional exactly once.
>>> >> >>>>
>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
>>> >> >>>>> Here are some examples and details of the scenarios. The
>>> KafkaRDD is
>>> >> >>>>> the
>>> >> >>>>> most
>>> >> >>>>> error prone to polling
>>> >> >>>>> timeouts and concurrentm modification errors.
>>> >> >>>>>
>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes
>>> them
>>> >> >>>>> in
>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>>> consumer
>>> >> >>>>> group
>>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>>> has 4
>>> >> >>>>> partitions. We routinely get timeout errors when polling for
>>> data.
>>> >> >>>>> This
>>> >> >>>>> occurs whether we process in parallel or sequentially.
>>> >> >>>>>
>>> >> >>>>> *Spark Kafka setting:*
>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>> >> >>>>>
>>> >> >>>>> *Kafka Consumer Params:*
>>> >> >>>>> metric.reporters = []
>>> >> >>>>> metadata.max.age.ms = 300000
>>> >> >>>>> partition.assignment.strategy =
>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>> >> >>>>> reconnect.backoff.ms = 50
>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>> >> >>>>> max.partition.fetch.bytes = 1048576
>>> >> >>>>> bootstrap.servers = [somemachine:31000]
>>> >> >>>>> ssl.keystore.type = JKS
>>> >> >>>>> enable.auto.commit = false
>>> >> >>>>> sasl.mechanism = GSSAPI
>>> >> >>>>> interceptor.classes = null
>>> >> >>>>> exclude.internal.topics = true
>>> >> >>>>> ssl.truststore.password = null
>>> >> >>>>> client.id =
>>> >> >>>>> ssl.endpoint.identification.algorithm = null
>>> >> >>>>> max.poll.records = 1000
>>> >> >>>>> check.crcs = true
>>> >> >>>>> request.timeout.ms = 40000
>>> >> >>>>> heartbeat.interval.ms = 3000
>>> >> >>>>> auto.commit.interval.ms = 5000
>>> >> >>>>> receive.buffer.bytes = 65536
>>> >> >>>>> ssl.truststore.type = JKS
>>> >> >>>>> ssl.truststore.location = null
>>> >> >>>>> ssl.keystore.password = null
>>> >> >>>>> fetch.min.bytes = 1
>>> >> >>>>> send.buffer.bytes = 131072
>>> >> >>>>> value.deserializer = class
>>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser
>>> ializer
>>> >> >>>>> group.id = storage-group
>>> >> >>>>> retry.backoff.ms = 100
>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>> >> >>>>> sasl.kerberos.service.name = null
>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>>> >> >>>>> ssl.key.password = null
>>> >> >>>>> fetch.max.wait.ms = 500
>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>>> >> >>>>> connections.max.idle.ms = 540000
>>> >> >>>>> session.timeout.ms = 30000
>>> >> >>>>> metrics.num.samples = 2
>>> >> >>>>> key.deserializer = class
>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>>> >> >>>>> ssl.protocol = TLS
>>> >> >>>>> ssl.provider = null
>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>> >> >>>>> ssl.keystore.location = null
>>> >> >>>>> ssl.cipher.suites = null
>>> >> >>>>> security.protocol = PLAINTEXT
>>> >> >>>>> ssl.keymanager.algorithm = SunX509
>>> >> >>>>> metrics.sample.window.ms = 30000
>>> >> >>>>> auto.offset.reset = earliest
>>> >> >>>>>
>>> >> >>>>> *Example usage with KafkaRDD:*
>>> >> >>>>> val channels = Seq("channel1", "channel2")
>>> >> >>>>>
>>> >> >>>>> channels.toParArray.foreach { channel =>
>>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>> >> >>>>>
>>> >> >>>>>  // Get offsets for the given topic and the consumer group
>>> >> >>>>> 'storage-group'
>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>> >> >>>>>
>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>> >> >>>>>        kafkaParams asJava,
>>> >> >>>>>        offsetRanges,
>>> >> >>>>>        PreferConsistent).toDS[V]
>>> >> >>>>>
>>> >> >>>>>  // Do some aggregations
>>> >> >>>>>  ds.agg(...)
>>> >> >>>>>  // Save the data
>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>> >> >>>>>  // Save offsets using a KafkaConsumer
>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>>> >> >>>>>  consumer.close()
>>> >> >>>>> }
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>> *Example usage with Kafka Stream:*
>>> >> >>>>> This creates a stream and processes events in each partition.
>>> At the
>>> >> >>>>> end
>>> >> >>>>> of
>>> >> >>>>> processing for
>>> >> >>>>> each partition, we updated the offsets for each partition. This
>>> is
>>> >> >>>>> challenging to do, but is better
>>> >> >>>>> then calling commitAysnc on the stream, because that occurs
>>> after
>>> >> >>>>> the
>>> >> >>>>> /entire/ RDD has been
>>> >> >>>>> processed. This method minimizes duplicates in an exactly once
>>> >> >>>>> environment.
>>> >> >>>>> Since the executors
>>> >> >>>>> use their own custom group "spark-executor-processor-group"
>>> and the
>>> >> >>>>> commit
>>> >> >>>>> is buried in private
>>> >> >>>>> functions we are unable to use the executors cached consumer to
>>> >> >>>>> update
>>> >> >>>>> the
>>> >> >>>>> offsets. This requires us
>>> >> >>>>> to go through multiple steps to update the Kafka offsets
>>> >> >>>>> accordingly.
>>> >> >>>>>
>>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>> >> >>>>>
>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>> >> >>>>>      PreferConsistent,
>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>> >> >>>>>        kafkaParams,
>>> >> >>>>>        offsetRanges))
>>> >> >>>>>
>>> >> >>>>> stream.foreachRDD { rdd =>
>>> >> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRang
>>> es].offsetRanges
>>> >> >>>>>
>>> >> >>>>>    // Transform our data
>>> >> >>>>>   rdd.foreachPartition { events =>
>>> >> >>>>>       // Establish a consumer in the executor so we can update
>>> >> >>>>> offsets
>>> >> >>>>> after each partition.
>>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer to
>>> help
>>> >> >>>>> get/set
>>> >> >>>>> offsets
>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>> >> >>>>>       // do something with our data
>>> >> >>>>>
>>> >> >>>>>       // Write the offsets that were updated in this partition
>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>>> >> >>>>> endOffset))
>>> >> >>>>>   }
>>> >> >>>>> }
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>> --
>>> >> >>>>> View this message in context:
>>> >> >>>>>
>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil
>>> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>>> >> >>>>> Nabble.com.
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>> ------------------------------------------------------------
>>> ---------
>>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>> >> >>>>>
>>> >> >>>
>>> >> >>>
>>> >> >>
>>> >> >> ------------------------------------------------------------
>>> ---------
>>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>> >> >>
>>> >> >
>>> >
>>> >
>>>
>>
>>
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Yeah, the KafkaRDD cannot be reused. It's better to document it.

On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <iv...@vadio.com> wrote:

> Ok, I have split he KafkaRDD logic to each use their own group and bumped
> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
> ends up with a timeout and exception so I am still perplexed on that one.
> The new error I am getting now is a `ConcurrentModificationException`
> when Spark is trying to remove the CachedKafkaConsumer.
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>
> Here is the basic logic:
>
> *Using KafkaRDD* - This takes a list of channels and processes them in
> parallel using the KafkaRDD directly. They each use a distinct consumer
> group (s"$prefix-$topic"), and each has it's own topic and each topic has
> 4 partitions. We routinely get timeout errors when polling for data when
> the poll.ms is less then 2 seconds. This occurs whether we process in
> parallel.
>
> *Example usage with KafkaRDD:*
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group "$prefix-$
> topic"
>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
>         kafkaParams asJava,
>         offsetRanges,
>         PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
> I am not sure why the concurrent issue is there as I have tried to debug
> and also looked at the KafkaConsumer code as well, but everything looks
> like it should not occur. The things to figure out is why when running in
> parallel does this occur and also why the timeouts still occur.
>
> Thanks,
>
> Ivan
>
> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> There definitely is Kafka documentation indicating that you should use
>> a different consumer group for logically different subscribers, this
>> is really basic to Kafka:
>>
>> http://kafka.apache.org/documentation#intro_consumers
>>
>> As for your comment that "commit async after each RDD, which is not
>> really viable also", how is it not viable?  Again, committing offsets
>> to Kafka doesn't give you reliable delivery semantics unless your
>> downstream data store is idempotent.  If your downstream data store is
>> idempotent, then it shouldn't matter to you when offset commits
>> happen, as long as they happen within a reasonable time after the data
>> is written.
>>
>> Do you want to keep arguing with me, or follow my advice and proceed
>> with debugging any remaining issues after you make the changes I
>> suggested?
>>
>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>> > With our stream version, we update the offsets for only the partition we
>> > operating on. We even break down the partition into smaller batches and
>> then
>> > update the offsets after each batch within the partition. With Spark
>> 1.6 and
>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>> > necessarily a Spark issue since Kafka no longer allows you to simply
>> update
>> > the offsets for a given consumer group. You have to subscribe or assign
>> > partitions to even do so.
>> >
>> > As for storing the offsets in some other place like a DB, it don't find
>> this
>> > useful because you then can't use tools like Kafka Manager. In order to
>> do
>> > so you would have to store in a DB and the circle back and update Kafka
>> > afterwards. This means you have to keep two sources in sync which is not
>> > really a good idea.
>> >
>> > It is a challenge in Spark to use the Kafka offsets since the drive
>> keeps
>> > subscribed to the topic(s) and consumer group, while the executors
>> prepend
>> > "spark-executor-" to the consumer group. The stream (driver) does allow
>> you
>> > to commit async after each RDD, which is not really viable also. I have
>> not
>> > of implementing an Akka actor system on the driver and send it messages
>> from
>> > the executor code to update the offsets, but then that is asynchronous
>> as
>> > well so not really a good solution.
>> >
>> > I have no idea why Kafka made this change and also why in the parallel
>> > KafkaRDD application we would be advised to use different consumer
>> groups
>> > for each RDD. That seems strange to me that different consumer groups
>> would
>> > be required or advised. There is no Kafka documentation that I know if
>> that
>> > states this. The biggest issue I see with the parallel KafkaRDD is the
>> > timeouts. I have tried to set poll.ms to 30 seconds and still get the
>> issue.
>> > Something is not right here and just not seem right. As I mentioned
>> with the
>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
>> > issue. We have been running the same basic logic for over a year now
>> without
>> > one hitch at all.
>> >
>> > Ivan
>> >
>> >
>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>> >>
>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>> >> general, not just kafka) have been progressing on to the next batch
>> >> after a given batch aborts for quite some time now.  Yet another
>> >> reason I put offsets in my database transactionally.  My jobs throw
>> >> exceptions if the offset in the DB isn't what I expected it to be.
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com>
>> wrote:
>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>> using
>> >> > the "Kafka Stream" approach that he is using, except I'm storing my
>> offsets
>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>> haven't yet
>> >> > implemented the KafkaRDD approach, and therefore don't have the
>> concurrency
>> >> > issues, but a very similar use case is coming up for me soon, it's
>> just been
>> >> > backburnered until I can get streaming to be more reliable (I will
>> >> > definitely ensure unique group IDs when I do). Offset commits are
>> certainly
>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>> with Spark.
>> >> >
>> >> > While i may be able to alleviate the timeout by just increasing it,
>> I've
>> >> > noticed something else that is more worrying: When one task fails 4
>> times in
>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>> Spark aborts
>> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in
>> stage _
>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>> instead of
>> >> > stopping the Application there (as previous versions of Spark did)
>> the next
>> >> > microbatch marches on and offsets are committed ahead of the failed
>> >> > microbatch. Suddenly my at-least-once app becomes more
>> >> > sometimes-at-least-once which is no good. In order for spark to
>> display that
>> >> > failure, I must be propagating the errors up to Spark, but the
>> behavior of
>> >> > marching forward with the next microbatch seems to be new, and a big
>> >> > potential for data loss in streaming applications.
>> >> >
>> >> > Am I perhaps missing a setting to stop the entire streaming
>> application
>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>> behavior
>> >> > of a streaming application skipping over failed microbatches?
>> >> >
>> >> > Thanks,
>> >> > Sean
>> >> >
>> >> >
>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>> >> >>
>> >> >> So basically what I am saying is
>> >> >>
>> >> >> - increase poll.ms
>> >> >> - use a separate group id everywhere
>> >> >> - stop committing offsets under the covers
>> >> >>
>> >> >> That should eliminate all of those as possible causes, and then we
>> can
>> >> >> see if there are still issues.
>> >> >>
>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If you
>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>> >> >> consumer api should be usable with later brokers.  As long as you
>> >> >> don't need SSL or dynamic subscriptions, and it meets your needs,
>> keep
>> >> >> using it.
>> >> >>
>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com>
>> wrote:
>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each
>> RDD
>> >> >>> uses a
>> >> >>> single distinct topic. For example, the group would be something
>> like
>> >> >>> "storage-group", and the topics would be "storage-channel1", and
>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>> >> >>> assigned the
>> >> >>> partitions assigned, and then commit offsets are called after the
>> RDD
>> >> >>> is
>> >> >>> processed. This should not interfere with the consumer group used
>> by
>> >> >>> the
>> >> >>> executors which would be "spark-executor-storage-group".
>> >> >>>
>> >> >>> In the streaming example there is a single topic ("client-events")
>> and
>> >> >>> group
>> >> >>> ("processing-group"). A single stream is created and offsets are
>> >> >>> manually
>> >> >>> updated from the executor after each partition is handled. This
>> was a
>> >> >>> challenge since Spark now requires one to assign or subscribe to a
>> >> >>> topic in
>> >> >>> order to even update the offsets. In 0.8.2.x you did not have to
>> worry
>> >> >>> about
>> >> >>> that. This approach limits your exposure to duplicate data since
>> >> >>> idempotent
>> >> >>> records are not entirely possible in our scenario. At least
>> without a
>> >> >>> lot of
>> >> >>> re-running of logic to de-dup.
>> >> >>>
>> >> >>> Thanks,
>> >> >>>
>> >> >>> Ivan
>> >> >>>
>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <cody@koeninger.org
>> >
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> So just to be clear, the answers to my questions are
>> >> >>>>
>> >> >>>> - you are not using different group ids, you're using the same
>> group
>> >> >>>> id everywhere
>> >> >>>>
>> >> >>>> - you are committing offsets manually
>> >> >>>>
>> >> >>>> Right?
>> >> >>>>
>> >> >>>> If you want to eliminate network or kafka misbehavior as a source,
>> >> >>>> tune poll.ms upwards even higher.
>> >> >>>>
>> >> >>>> You must use different group ids for different rdds or streams.
>> >> >>>> Kafka consumers won't behave the way you expect if they are all in
>> >> >>>> the
>> >> >>>> same group id, and the consumer cache is keyed by group id. Yes,
>> the
>> >> >>>> executor will tack "spark-executor-" on to the beginning, but if
>> you
>> >> >>>> give it the same base group id, it will be the same.  And the
>> driver
>> >> >>>> will use the group id you gave it, unmodified.
>> >> >>>>
>> >> >>>> Finally, I really can't help you if you're manually writing your
>> own
>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>> >> >>>> duplicates that way doesn't really make sense, your system must be
>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>> store,
>> >> >>>> it can't do transactional exactly once.
>> >> >>>>
>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
>> >> >>>>> Here are some examples and details of the scenarios. The
>> KafkaRDD is
>> >> >>>>> the
>> >> >>>>> most
>> >> >>>>> error prone to polling
>> >> >>>>> timeouts and concurrentm modification errors.
>> >> >>>>>
>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes
>> them
>> >> >>>>> in
>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>> consumer
>> >> >>>>> group
>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>> has 4
>> >> >>>>> partitions. We routinely get timeout errors when polling for
>> data.
>> >> >>>>> This
>> >> >>>>> occurs whether we process in parallel or sequentially.
>> >> >>>>>
>> >> >>>>> *Spark Kafka setting:*
>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>> >> >>>>>
>> >> >>>>> *Kafka Consumer Params:*
>> >> >>>>> metric.reporters = []
>> >> >>>>> metadata.max.age.ms = 300000
>> >> >>>>> partition.assignment.strategy =
>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>> >> >>>>> reconnect.backoff.ms = 50
>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> >> >>>>> max.partition.fetch.bytes = 1048576
>> >> >>>>> bootstrap.servers = [somemachine:31000]
>> >> >>>>> ssl.keystore.type = JKS
>> >> >>>>> enable.auto.commit = false
>> >> >>>>> sasl.mechanism = GSSAPI
>> >> >>>>> interceptor.classes = null
>> >> >>>>> exclude.internal.topics = true
>> >> >>>>> ssl.truststore.password = null
>> >> >>>>> client.id =
>> >> >>>>> ssl.endpoint.identification.algorithm = null
>> >> >>>>> max.poll.records = 1000
>> >> >>>>> check.crcs = true
>> >> >>>>> request.timeout.ms = 40000
>> >> >>>>> heartbeat.interval.ms = 3000
>> >> >>>>> auto.commit.interval.ms = 5000
>> >> >>>>> receive.buffer.bytes = 65536
>> >> >>>>> ssl.truststore.type = JKS
>> >> >>>>> ssl.truststore.location = null
>> >> >>>>> ssl.keystore.password = null
>> >> >>>>> fetch.min.bytes = 1
>> >> >>>>> send.buffer.bytes = 131072
>> >> >>>>> value.deserializer = class
>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser
>> ializer
>> >> >>>>> group.id = storage-group
>> >> >>>>> retry.backoff.ms = 100
>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >> >>>>> sasl.kerberos.service.name = null
>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>> >> >>>>> ssl.key.password = null
>> >> >>>>> fetch.max.wait.ms = 500
>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>> >> >>>>> connections.max.idle.ms = 540000
>> >> >>>>> session.timeout.ms = 30000
>> >> >>>>> metrics.num.samples = 2
>> >> >>>>> key.deserializer = class
>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>> >> >>>>> ssl.protocol = TLS
>> >> >>>>> ssl.provider = null
>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >> >>>>> ssl.keystore.location = null
>> >> >>>>> ssl.cipher.suites = null
>> >> >>>>> security.protocol = PLAINTEXT
>> >> >>>>> ssl.keymanager.algorithm = SunX509
>> >> >>>>> metrics.sample.window.ms = 30000
>> >> >>>>> auto.offset.reset = earliest
>> >> >>>>>
>> >> >>>>> *Example usage with KafkaRDD:*
>> >> >>>>> val channels = Seq("channel1", "channel2")
>> >> >>>>>
>> >> >>>>> channels.toParArray.foreach { channel =>
>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >> >>>>>
>> >> >>>>>  // Get offsets for the given topic and the consumer group
>> >> >>>>> 'storage-group'
>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>> >> >>>>>
>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>> >> >>>>>        kafkaParams asJava,
>> >> >>>>>        offsetRanges,
>> >> >>>>>        PreferConsistent).toDS[V]
>> >> >>>>>
>> >> >>>>>  // Do some aggregations
>> >> >>>>>  ds.agg(...)
>> >> >>>>>  // Save the data
>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>> >> >>>>>  // Save offsets using a KafkaConsumer
>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>> >> >>>>>  consumer.close()
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> *Example usage with Kafka Stream:*
>> >> >>>>> This creates a stream and processes events in each partition. At
>> the
>> >> >>>>> end
>> >> >>>>> of
>> >> >>>>> processing for
>> >> >>>>> each partition, we updated the offsets for each partition. This
>> is
>> >> >>>>> challenging to do, but is better
>> >> >>>>> then calling commitAysnc on the stream, because that occurs after
>> >> >>>>> the
>> >> >>>>> /entire/ RDD has been
>> >> >>>>> processed. This method minimizes duplicates in an exactly once
>> >> >>>>> environment.
>> >> >>>>> Since the executors
>> >> >>>>> use their own custom group "spark-executor-processor-group" and
>> the
>> >> >>>>> commit
>> >> >>>>> is buried in private
>> >> >>>>> functions we are unable to use the executors cached consumer to
>> >> >>>>> update
>> >> >>>>> the
>> >> >>>>> offsets. This requires us
>> >> >>>>> to go through multiple steps to update the Kafka offsets
>> >> >>>>> accordingly.
>> >> >>>>>
>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>> >> >>>>>
>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>> >> >>>>>      PreferConsistent,
>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>> >> >>>>>        kafkaParams,
>> >> >>>>>        offsetRanges))
>> >> >>>>>
>> >> >>>>> stream.foreachRDD { rdd =>
>> >> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRang
>> es].offsetRanges
>> >> >>>>>
>> >> >>>>>    // Transform our data
>> >> >>>>>   rdd.foreachPartition { events =>
>> >> >>>>>       // Establish a consumer in the executor so we can update
>> >> >>>>> offsets
>> >> >>>>> after each partition.
>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer to
>> help
>> >> >>>>> get/set
>> >> >>>>> offsets
>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>> >> >>>>>       // do something with our data
>> >> >>>>>
>> >> >>>>>       // Write the offsets that were updated in this partition
>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>> >> >>>>> endOffset))
>> >> >>>>>   }
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> --
>> >> >>>>> View this message in context:
>> >> >>>>>
>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil
>> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>> >> >>>>> Nabble.com.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> ------------------------------------------------------------
>> ---------
>> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >> >>>>>
>> >> >>>
>> >> >>>
>> >> >>
>> >> >> ------------------------------------------------------------
>> ---------
>> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >> >>
>> >> >
>> >
>> >
>>
>
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Ivan von Nagy <iv...@vadio.com>.
Ok, I have split he KafkaRDD logic to each use their own group and bumped
the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms ends
up with a timeout and exception so I am still perplexed on that one. The
new error I am getting now is a `ConcurrentModificationException` when
Spark is trying to remove the CachedKafkaConsumer.

java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)

Here is the basic logic:

*Using KafkaRDD* - This takes a list of channels and processes them in
parallel using the KafkaRDD directly. They each use a distinct consumer
group (s"$prefix-$topic"), and each has it's own topic and each topic has 4
partitions. We routinely get timeout errors when polling for data when the
poll.ms is less then 2 seconds. This occurs whether we process in parallel.

*Example usage with KafkaRDD:*
val channels = Seq("channel1", "channel2")

channels.toParArray.foreach { channel =>
  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)

  // Get offsets for the given topic and the consumer group "$prefix-$topic"

  val offsetRanges = getOffsets(s"$prefix-$topic", channel)

  val ds = KafkaUtils.createRDD[K, V](context,
        kafkaParams asJava,
        offsetRanges,
        PreferConsistent).toDS[V]

  // Do some aggregations
  ds.agg(...)
  // Save the data
  ds.write.mode(SaveMode.Append).parquet(somePath)
  // Save offsets using a KafkaConsumer
  consumer.commitSync(newOffsets.asJava)
  consumer.close()
}

I am not sure why the concurrent issue is there as I have tried to debug
and also looked at the KafkaConsumer code as well, but everything looks
like it should not occur. The things to figure out is why when running in
parallel does this occur and also why the timeouts still occur.

Thanks,

Ivan

On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <co...@koeninger.org> wrote:

> There definitely is Kafka documentation indicating that you should use
> a different consumer group for logically different subscribers, this
> is really basic to Kafka:
>
> http://kafka.apache.org/documentation#intro_consumers
>
> As for your comment that "commit async after each RDD, which is not
> really viable also", how is it not viable?  Again, committing offsets
> to Kafka doesn't give you reliable delivery semantics unless your
> downstream data store is idempotent.  If your downstream data store is
> idempotent, then it shouldn't matter to you when offset commits
> happen, as long as they happen within a reasonable time after the data
> is written.
>
> Do you want to keep arguing with me, or follow my advice and proceed
> with debugging any remaining issues after you make the changes I
> suggested?
>
> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com> wrote:
> > With our stream version, we update the offsets for only the partition we
> > operating on. We even break down the partition into smaller batches and
> then
> > update the offsets after each batch within the partition. With Spark 1.6
> and
> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
> > necessarily a Spark issue since Kafka no longer allows you to simply
> update
> > the offsets for a given consumer group. You have to subscribe or assign
> > partitions to even do so.
> >
> > As for storing the offsets in some other place like a DB, it don't find
> this
> > useful because you then can't use tools like Kafka Manager. In order to
> do
> > so you would have to store in a DB and the circle back and update Kafka
> > afterwards. This means you have to keep two sources in sync which is not
> > really a good idea.
> >
> > It is a challenge in Spark to use the Kafka offsets since the drive keeps
> > subscribed to the topic(s) and consumer group, while the executors
> prepend
> > "spark-executor-" to the consumer group. The stream (driver) does allow
> you
> > to commit async after each RDD, which is not really viable also. I have
> not
> > of implementing an Akka actor system on the driver and send it messages
> from
> > the executor code to update the offsets, but then that is asynchronous as
> > well so not really a good solution.
> >
> > I have no idea why Kafka made this change and also why in the parallel
> > KafkaRDD application we would be advised to use different consumer groups
> > for each RDD. That seems strange to me that different consumer groups
> would
> > be required or advised. There is no Kafka documentation that I know if
> that
> > states this. The biggest issue I see with the parallel KafkaRDD is the
> > timeouts. I have tried to set poll.ms to 30 seconds and still get the
> issue.
> > Something is not right here and just not seem right. As I mentioned with
> the
> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
> > issue. We have been running the same basic logic for over a year now
> without
> > one hitch at all.
> >
> > Ivan
> >
> >
> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>
> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
> >> general, not just kafka) have been progressing on to the next batch
> >> after a given batch aborts for quite some time now.  Yet another
> >> reason I put offsets in my database transactionally.  My jobs throw
> >> exceptions if the offset in the DB isn't what I expected it to be.
> >>
> >>
> >>
> >>
> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com>
> wrote:
> >> > I've been encountering the same kinds of timeout issues as Ivan, using
> >> > the "Kafka Stream" approach that he is using, except I'm storing my
> offsets
> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
> haven't yet
> >> > implemented the KafkaRDD approach, and therefore don't have the
> concurrency
> >> > issues, but a very similar use case is coming up for me soon, it's
> just been
> >> > backburnered until I can get streaming to be more reliable (I will
> >> > definitely ensure unique group IDs when I do). Offset commits are
> certainly
> >> > more painful in Kafka 0.10, and that doesn't have anything to do with
> Spark.
> >> >
> >> > While i may be able to alleviate the timeout by just increasing it,
> I've
> >> > noticed something else that is more worrying: When one task fails 4
> times in
> >> > a row (i.e. "Failed to get records for _ after polling for _"), Spark
> aborts
> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in
> stage _
> >> > failed 4 times". That's fine, and it's the behavior I want, but
> instead of
> >> > stopping the Application there (as previous versions of Spark did)
> the next
> >> > microbatch marches on and offsets are committed ahead of the failed
> >> > microbatch. Suddenly my at-least-once app becomes more
> >> > sometimes-at-least-once which is no good. In order for spark to
> display that
> >> > failure, I must be propagating the errors up to Spark, but the
> behavior of
> >> > marching forward with the next microbatch seems to be new, and a big
> >> > potential for data loss in streaming applications.
> >> >
> >> > Am I perhaps missing a setting to stop the entire streaming
> application
> >> > once spark.task.maxFailures is reached? Has anyone else seen this
> behavior
> >> > of a streaming application skipping over failed microbatches?
> >> >
> >> > Thanks,
> >> > Sean
> >> >
> >> >
> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >> >>
> >> >> So basically what I am saying is
> >> >>
> >> >> - increase poll.ms
> >> >> - use a separate group id everywhere
> >> >> - stop committing offsets under the covers
> >> >>
> >> >> That should eliminate all of those as possible causes, and then we
> can
> >> >> see if there are still issues.
> >> >>
> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> >> >> subscribe to a topic in order to update offsets, Kafka does.  If you
> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
> >> >> consumer api should be usable with later brokers.  As long as you
> >> >> don't need SSL or dynamic subscriptions, and it meets your needs,
> keep
> >> >> using it.
> >> >>
> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com>
> wrote:
> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each
> RDD
> >> >>> uses a
> >> >>> single distinct topic. For example, the group would be something
> like
> >> >>> "storage-group", and the topics would be "storage-channel1", and
> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
> >> >>> assigned the
> >> >>> partitions assigned, and then commit offsets are called after the
> RDD
> >> >>> is
> >> >>> processed. This should not interfere with the consumer group used by
> >> >>> the
> >> >>> executors which would be "spark-executor-storage-group".
> >> >>>
> >> >>> In the streaming example there is a single topic ("client-events")
> and
> >> >>> group
> >> >>> ("processing-group"). A single stream is created and offsets are
> >> >>> manually
> >> >>> updated from the executor after each partition is handled. This was
> a
> >> >>> challenge since Spark now requires one to assign or subscribe to a
> >> >>> topic in
> >> >>> order to even update the offsets. In 0.8.2.x you did not have to
> worry
> >> >>> about
> >> >>> that. This approach limits your exposure to duplicate data since
> >> >>> idempotent
> >> >>> records are not entirely possible in our scenario. At least without
> a
> >> >>> lot of
> >> >>> re-running of logic to de-dup.
> >> >>>
> >> >>> Thanks,
> >> >>>
> >> >>> Ivan
> >> >>>
> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <co...@koeninger.org>
> >> >>> wrote:
> >> >>>>
> >> >>>> So just to be clear, the answers to my questions are
> >> >>>>
> >> >>>> - you are not using different group ids, you're using the same
> group
> >> >>>> id everywhere
> >> >>>>
> >> >>>> - you are committing offsets manually
> >> >>>>
> >> >>>> Right?
> >> >>>>
> >> >>>> If you want to eliminate network or kafka misbehavior as a source,
> >> >>>> tune poll.ms upwards even higher.
> >> >>>>
> >> >>>> You must use different group ids for different rdds or streams.
> >> >>>> Kafka consumers won't behave the way you expect if they are all in
> >> >>>> the
> >> >>>> same group id, and the consumer cache is keyed by group id. Yes,
> the
> >> >>>> executor will tack "spark-executor-" on to the beginning, but if
> you
> >> >>>> give it the same base group id, it will be the same.  And the
> driver
> >> >>>> will use the group id you gave it, unmodified.
> >> >>>>
> >> >>>> Finally, I really can't help you if you're manually writing your
> own
> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
> >> >>>> duplicates that way doesn't really make sense, your system must be
> >> >>>> able to handle duplicates if you're using kafka as an offsets
> store,
> >> >>>> it can't do transactional exactly once.
> >> >>>>
> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
> >> >>>>> Here are some examples and details of the scenarios. The KafkaRDD
> is
> >> >>>>> the
> >> >>>>> most
> >> >>>>> error prone to polling
> >> >>>>> timeouts and concurrentm modification errors.
> >> >>>>>
> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes
> them
> >> >>>>> in
> >> >>>>> parallel using the KafkaRDD directly. they all use the same
> consumer
> >> >>>>> group
> >> >>>>> ('storage-group'), but each has it's own topic and each topic has
> 4
> >> >>>>> partitions. We routinely get timeout errors when polling for data.
> >> >>>>> This
> >> >>>>> occurs whether we process in parallel or sequentially.
> >> >>>>>
> >> >>>>> *Spark Kafka setting:*
> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
> >> >>>>>
> >> >>>>> *Kafka Consumer Params:*
> >> >>>>> metric.reporters = []
> >> >>>>> metadata.max.age.ms = 300000
> >> >>>>> partition.assignment.strategy =
> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
> >> >>>>> reconnect.backoff.ms = 50
> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
> >> >>>>> max.partition.fetch.bytes = 1048576
> >> >>>>> bootstrap.servers = [somemachine:31000]
> >> >>>>> ssl.keystore.type = JKS
> >> >>>>> enable.auto.commit = false
> >> >>>>> sasl.mechanism = GSSAPI
> >> >>>>> interceptor.classes = null
> >> >>>>> exclude.internal.topics = true
> >> >>>>> ssl.truststore.password = null
> >> >>>>> client.id =
> >> >>>>> ssl.endpoint.identification.algorithm = null
> >> >>>>> max.poll.records = 1000
> >> >>>>> check.crcs = true
> >> >>>>> request.timeout.ms = 40000
> >> >>>>> heartbeat.interval.ms = 3000
> >> >>>>> auto.commit.interval.ms = 5000
> >> >>>>> receive.buffer.bytes = 65536
> >> >>>>> ssl.truststore.type = JKS
> >> >>>>> ssl.truststore.location = null
> >> >>>>> ssl.keystore.password = null
> >> >>>>> fetch.min.bytes = 1
> >> >>>>> send.buffer.bytes = 131072
> >> >>>>> value.deserializer = class
> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeseriali
> zer
> >> >>>>> group.id = storage-group
> >> >>>>> retry.backoff.ms = 100
> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >> >>>>> sasl.kerberos.service.name = null
> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
> >> >>>>> ssl.trustmanager.algorithm = PKIX
> >> >>>>> ssl.key.password = null
> >> >>>>> fetch.max.wait.ms = 500
> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
> >> >>>>> connections.max.idle.ms = 540000
> >> >>>>> session.timeout.ms = 30000
> >> >>>>> metrics.num.samples = 2
> >> >>>>> key.deserializer = class
> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
> >> >>>>> ssl.protocol = TLS
> >> >>>>> ssl.provider = null
> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >> >>>>> ssl.keystore.location = null
> >> >>>>> ssl.cipher.suites = null
> >> >>>>> security.protocol = PLAINTEXT
> >> >>>>> ssl.keymanager.algorithm = SunX509
> >> >>>>> metrics.sample.window.ms = 30000
> >> >>>>> auto.offset.reset = earliest
> >> >>>>>
> >> >>>>> *Example usage with KafkaRDD:*
> >> >>>>> val channels = Seq("channel1", "channel2")
> >> >>>>>
> >> >>>>> channels.toParArray.foreach { channel =>
> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >> >>>>>
> >> >>>>>  // Get offsets for the given topic and the consumer group
> >> >>>>> 'storage-group'
> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
> >> >>>>>
> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
> >> >>>>>        kafkaParams asJava,
> >> >>>>>        offsetRanges,
> >> >>>>>        PreferConsistent).toDS[V]
> >> >>>>>
> >> >>>>>  // Do some aggregations
> >> >>>>>  ds.agg(...)
> >> >>>>>  // Save the data
> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
> >> >>>>>  // Save offsets using a KafkaConsumer
> >> >>>>>  consumer.commitSync(newOffsets.asJava)
> >> >>>>>  consumer.close()
> >> >>>>> }
> >> >>>>>
> >> >>>>>
> >> >>>>> *Example usage with Kafka Stream:*
> >> >>>>> This creates a stream and processes events in each partition. At
> the
> >> >>>>> end
> >> >>>>> of
> >> >>>>> processing for
> >> >>>>> each partition, we updated the offsets for each partition. This is
> >> >>>>> challenging to do, but is better
> >> >>>>> then calling commitAysnc on the stream, because that occurs after
> >> >>>>> the
> >> >>>>> /entire/ RDD has been
> >> >>>>> processed. This method minimizes duplicates in an exactly once
> >> >>>>> environment.
> >> >>>>> Since the executors
> >> >>>>> use their own custom group "spark-executor-processor-group" and
> the
> >> >>>>> commit
> >> >>>>> is buried in private
> >> >>>>> functions we are unable to use the executors cached consumer to
> >> >>>>> update
> >> >>>>> the
> >> >>>>> offsets. This requires us
> >> >>>>> to go through multiple steps to update the Kafka offsets
> >> >>>>> accordingly.
> >> >>>>>
> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
> >> >>>>>
> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
> >> >>>>>      PreferConsistent,
> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
> >> >>>>>        kafkaParams,
> >> >>>>>        offsetRanges))
> >> >>>>>
> >> >>>>> stream.foreachRDD { rdd =>
> >> >>>>>    val offsetRanges = rdd.asInstanceOf[
> HasOffsetRanges].offsetRanges
> >> >>>>>
> >> >>>>>    // Transform our data
> >> >>>>>   rdd.foreachPartition { events =>
> >> >>>>>       // Establish a consumer in the executor so we can update
> >> >>>>> offsets
> >> >>>>> after each partition.
> >> >>>>>       // This class is homegrown and uses the KafkaConsumer to
> help
> >> >>>>> get/set
> >> >>>>> offsets
> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
> >> >>>>>       // do something with our data
> >> >>>>>
> >> >>>>>       // Write the offsets that were updated in this partition
> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
> >> >>>>> endOffset))
> >> >>>>>   }
> >> >>>>> }
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> --
> >> >>>>> View this message in context:
> >> >>>>>
> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/
> Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
> >> >>>>> Sent from the Apache Spark User List mailing list archive at
> >> >>>>> Nabble.com.
> >> >>>>>
> >> >>>>>
> >> >>>>> ------------------------------------------------------------
> ---------
> >> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >> >>>>>
> >> >>>
> >> >>>
> >> >>
> >> >> ------------------------------------------------------------
> ---------
> >> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >> >>
> >> >
> >
> >
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
There definitely is Kafka documentation indicating that you should use
a different consumer group for logically different subscribers, this
is really basic to Kafka:

http://kafka.apache.org/documentation#intro_consumers

As for your comment that "commit async after each RDD, which is not
really viable also", how is it not viable?  Again, committing offsets
to Kafka doesn't give you reliable delivery semantics unless your
downstream data store is idempotent.  If your downstream data store is
idempotent, then it shouldn't matter to you when offset commits
happen, as long as they happen within a reasonable time after the data
is written.

Do you want to keep arguing with me, or follow my advice and proceed
with debugging any remaining issues after you make the changes I
suggested?

On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <iv...@vadio.com> wrote:
> With our stream version, we update the offsets for only the partition we
> operating on. We even break down the partition into smaller batches and then
> update the offsets after each batch within the partition. With Spark 1.6 and
> Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
> necessarily a Spark issue since Kafka no longer allows you to simply update
> the offsets for a given consumer group. You have to subscribe or assign
> partitions to even do so.
>
> As for storing the offsets in some other place like a DB, it don't find this
> useful because you then can't use tools like Kafka Manager. In order to do
> so you would have to store in a DB and the circle back and update Kafka
> afterwards. This means you have to keep two sources in sync which is not
> really a good idea.
>
> It is a challenge in Spark to use the Kafka offsets since the drive keeps
> subscribed to the topic(s) and consumer group, while the executors prepend
> "spark-executor-" to the consumer group. The stream (driver) does allow you
> to commit async after each RDD, which is not really viable also. I have not
> of implementing an Akka actor system on the driver and send it messages from
> the executor code to update the offsets, but then that is asynchronous as
> well so not really a good solution.
>
> I have no idea why Kafka made this change and also why in the parallel
> KafkaRDD application we would be advised to use different consumer groups
> for each RDD. That seems strange to me that different consumer groups would
> be required or advised. There is no Kafka documentation that I know if that
> states this. The biggest issue I see with the parallel KafkaRDD is the
> timeouts. I have tried to set poll.ms to 30 seconds and still get the issue.
> Something is not right here and just not seem right. As I mentioned with the
> streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
> issue. We have been running the same basic logic for over a year now without
> one hitch at all.
>
> Ivan
>
>
> On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> Someone can correct me, but I'm pretty sure Spark dstreams (in
>> general, not just kafka) have been progressing on to the next batch
>> after a given batch aborts for quite some time now.  Yet another
>> reason I put offsets in my database transactionally.  My jobs throw
>> exceptions if the offset in the DB isn't what I expected it to be.
>>
>>
>>
>>
>> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com> wrote:
>> > I've been encountering the same kinds of timeout issues as Ivan, using
>> > the "Kafka Stream" approach that he is using, except I'm storing my offsets
>> > manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
>> > implemented the KafkaRDD approach, and therefore don't have the concurrency
>> > issues, but a very similar use case is coming up for me soon, it's just been
>> > backburnered until I can get streaming to be more reliable (I will
>> > definitely ensure unique group IDs when I do). Offset commits are certainly
>> > more painful in Kafka 0.10, and that doesn't have anything to do with Spark.
>> >
>> > While i may be able to alleviate the timeout by just increasing it, I've
>> > noticed something else that is more worrying: When one task fails 4 times in
>> > a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts
>> > the Stage and Job with "Job aborted due to stage failure: Task _ in stage _
>> > failed 4 times". That's fine, and it's the behavior I want, but instead of
>> > stopping the Application there (as previous versions of Spark did) the next
>> > microbatch marches on and offsets are committed ahead of the failed
>> > microbatch. Suddenly my at-least-once app becomes more
>> > sometimes-at-least-once which is no good. In order for spark to display that
>> > failure, I must be propagating the errors up to Spark, but the behavior of
>> > marching forward with the next microbatch seems to be new, and a big
>> > potential for data loss in streaming applications.
>> >
>> > Am I perhaps missing a setting to stop the entire streaming application
>> > once spark.task.maxFailures is reached? Has anyone else seen this behavior
>> > of a streaming application skipping over failed microbatches?
>> >
>> > Thanks,
>> > Sean
>> >
>> >
>> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org> wrote:
>> >>
>> >> So basically what I am saying is
>> >>
>> >> - increase poll.ms
>> >> - use a separate group id everywhere
>> >> - stop committing offsets under the covers
>> >>
>> >> That should eliminate all of those as possible causes, and then we can
>> >> see if there are still issues.
>> >>
>> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> >> subscribe to a topic in order to update offsets, Kafka does.  If you
>> >> don't like the new Kafka consumer api, the existing 0.8 simple
>> >> consumer api should be usable with later brokers.  As long as you
>> >> don't need SSL or dynamic subscriptions, and it meets your needs, keep
>> >> using it.
>> >>
>> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD
>> >>> uses a
>> >>> single distinct topic. For example, the group would be something like
>> >>> "storage-group", and the topics would be "storage-channel1", and
>> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>> >>> assigned the
>> >>> partitions assigned, and then commit offsets are called after the RDD
>> >>> is
>> >>> processed. This should not interfere with the consumer group used by
>> >>> the
>> >>> executors which would be "spark-executor-storage-group".
>> >>>
>> >>> In the streaming example there is a single topic ("client-events") and
>> >>> group
>> >>> ("processing-group"). A single stream is created and offsets are
>> >>> manually
>> >>> updated from the executor after each partition is handled. This was a
>> >>> challenge since Spark now requires one to assign or subscribe to a
>> >>> topic in
>> >>> order to even update the offsets. In 0.8.2.x you did not have to worry
>> >>> about
>> >>> that. This approach limits your exposure to duplicate data since
>> >>> idempotent
>> >>> records are not entirely possible in our scenario. At least without a
>> >>> lot of
>> >>> re-running of logic to de-dup.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Ivan
>> >>>
>> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <co...@koeninger.org>
>> >>> wrote:
>> >>>>
>> >>>> So just to be clear, the answers to my questions are
>> >>>>
>> >>>> - you are not using different group ids, you're using the same group
>> >>>> id everywhere
>> >>>>
>> >>>> - you are committing offsets manually
>> >>>>
>> >>>> Right?
>> >>>>
>> >>>> If you want to eliminate network or kafka misbehavior as a source,
>> >>>> tune poll.ms upwards even higher.
>> >>>>
>> >>>> You must use different group ids for different rdds or streams.
>> >>>> Kafka consumers won't behave the way you expect if they are all in
>> >>>> the
>> >>>> same group id, and the consumer cache is keyed by group id. Yes, the
>> >>>> executor will tack "spark-executor-" on to the beginning, but if you
>> >>>> give it the same base group id, it will be the same.  And the driver
>> >>>> will use the group id you gave it, unmodified.
>> >>>>
>> >>>> Finally, I really can't help you if you're manually writing your own
>> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>> >>>> duplicates that way doesn't really make sense, your system must be
>> >>>> able to handle duplicates if you're using kafka as an offsets store,
>> >>>> it can't do transactional exactly once.
>> >>>>
>> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
>> >>>>> Here are some examples and details of the scenarios. The KafkaRDD is
>> >>>>> the
>> >>>>> most
>> >>>>> error prone to polling
>> >>>>> timeouts and concurrentm modification errors.
>> >>>>>
>> >>>>> *Using KafkaRDD* - This takes a list of channels and processes them
>> >>>>> in
>> >>>>> parallel using the KafkaRDD directly. they all use the same consumer
>> >>>>> group
>> >>>>> ('storage-group'), but each has it's own topic and each topic has 4
>> >>>>> partitions. We routinely get timeout errors when polling for data.
>> >>>>> This
>> >>>>> occurs whether we process in parallel or sequentially.
>> >>>>>
>> >>>>> *Spark Kafka setting:*
>> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>> >>>>>
>> >>>>> *Kafka Consumer Params:*
>> >>>>> metric.reporters = []
>> >>>>> metadata.max.age.ms = 300000
>> >>>>> partition.assignment.strategy =
>> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>> >>>>> reconnect.backoff.ms = 50
>> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> >>>>> max.partition.fetch.bytes = 1048576
>> >>>>> bootstrap.servers = [somemachine:31000]
>> >>>>> ssl.keystore.type = JKS
>> >>>>> enable.auto.commit = false
>> >>>>> sasl.mechanism = GSSAPI
>> >>>>> interceptor.classes = null
>> >>>>> exclude.internal.topics = true
>> >>>>> ssl.truststore.password = null
>> >>>>> client.id =
>> >>>>> ssl.endpoint.identification.algorithm = null
>> >>>>> max.poll.records = 1000
>> >>>>> check.crcs = true
>> >>>>> request.timeout.ms = 40000
>> >>>>> heartbeat.interval.ms = 3000
>> >>>>> auto.commit.interval.ms = 5000
>> >>>>> receive.buffer.bytes = 65536
>> >>>>> ssl.truststore.type = JKS
>> >>>>> ssl.truststore.location = null
>> >>>>> ssl.keystore.password = null
>> >>>>> fetch.min.bytes = 1
>> >>>>> send.buffer.bytes = 131072
>> >>>>> value.deserializer = class
>> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>> >>>>> group.id = storage-group
>> >>>>> retry.backoff.ms = 100
>> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >>>>> sasl.kerberos.service.name = null
>> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>> >>>>> ssl.trustmanager.algorithm = PKIX
>> >>>>> ssl.key.password = null
>> >>>>> fetch.max.wait.ms = 500
>> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>> >>>>> connections.max.idle.ms = 540000
>> >>>>> session.timeout.ms = 30000
>> >>>>> metrics.num.samples = 2
>> >>>>> key.deserializer = class
>> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>> >>>>> ssl.protocol = TLS
>> >>>>> ssl.provider = null
>> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >>>>> ssl.keystore.location = null
>> >>>>> ssl.cipher.suites = null
>> >>>>> security.protocol = PLAINTEXT
>> >>>>> ssl.keymanager.algorithm = SunX509
>> >>>>> metrics.sample.window.ms = 30000
>> >>>>> auto.offset.reset = earliest
>> >>>>>
>> >>>>> *Example usage with KafkaRDD:*
>> >>>>> val channels = Seq("channel1", "channel2")
>> >>>>>
>> >>>>> channels.toParArray.foreach { channel =>
>> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >>>>>
>> >>>>>  // Get offsets for the given topic and the consumer group
>> >>>>> 'storage-group'
>> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>> >>>>>
>> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>> >>>>>        kafkaParams asJava,
>> >>>>>        offsetRanges,
>> >>>>>        PreferConsistent).toDS[V]
>> >>>>>
>> >>>>>  // Do some aggregations
>> >>>>>  ds.agg(...)
>> >>>>>  // Save the data
>> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>> >>>>>  // Save offsets using a KafkaConsumer
>> >>>>>  consumer.commitSync(newOffsets.asJava)
>> >>>>>  consumer.close()
>> >>>>> }
>> >>>>>
>> >>>>>
>> >>>>> *Example usage with Kafka Stream:*
>> >>>>> This creates a stream and processes events in each partition. At the
>> >>>>> end
>> >>>>> of
>> >>>>> processing for
>> >>>>> each partition, we updated the offsets for each partition. This is
>> >>>>> challenging to do, but is better
>> >>>>> then calling commitAysnc on the stream, because that occurs after
>> >>>>> the
>> >>>>> /entire/ RDD has been
>> >>>>> processed. This method minimizes duplicates in an exactly once
>> >>>>> environment.
>> >>>>> Since the executors
>> >>>>> use their own custom group "spark-executor-processor-group" and the
>> >>>>> commit
>> >>>>> is buried in private
>> >>>>> functions we are unable to use the executors cached consumer to
>> >>>>> update
>> >>>>> the
>> >>>>> offsets. This requires us
>> >>>>> to go through multiple steps to update the Kafka offsets
>> >>>>> accordingly.
>> >>>>>
>> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>> >>>>>
>> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>> >>>>>      PreferConsistent,
>> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>> >>>>>        kafkaParams,
>> >>>>>        offsetRanges))
>> >>>>>
>> >>>>> stream.foreachRDD { rdd =>
>> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >>>>>
>> >>>>>    // Transform our data
>> >>>>>   rdd.foreachPartition { events =>
>> >>>>>       // Establish a consumer in the executor so we can update
>> >>>>> offsets
>> >>>>> after each partition.
>> >>>>>       // This class is homegrown and uses the KafkaConsumer to help
>> >>>>> get/set
>> >>>>> offsets
>> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>> >>>>>       // do something with our data
>> >>>>>
>> >>>>>       // Write the offsets that were updated in this partition
>> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>> >>>>> endOffset))
>> >>>>>   }
>> >>>>> }
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> --
>> >>>>> View this message in context:
>> >>>>>
>> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>> >>>>> Sent from the Apache Spark User List mailing list archive at
>> >>>>> Nabble.com.
>> >>>>>
>> >>>>>
>> >>>>> ---------------------------------------------------------------------
>> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >>>>>
>> >>>
>> >>>
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >>
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Ivan von Nagy <iv...@vadio.com>.
With our stream version, we update the offsets for only the partition we
operating on. We even break down the partition into smaller batches and
then update the offsets after each batch within the partition. With Spark
1.6 and Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
not necessarily a Spark issue since Kafka no longer allows you to simply
update the offsets for a given consumer group. You have to subscribe or
assign partitions to even do so.

As for storing the offsets in some other place like a DB, it don't find
this useful because you then can't use tools like Kafka Manager. In order
to do so you would have to store in a DB and the circle back and update
Kafka afterwards. This means you have to keep two sources in sync which is
not really a good idea.

It is a challenge in Spark to use the Kafka offsets since the drive keeps
subscribed to the topic(s) and consumer group, while the executors prepend
"spark-executor-" to the consumer group. The stream (driver) does allow you
to commit async after each RDD, which is not really viable also. I have not
of implementing an Akka actor system on the driver and send it messages
from the executor code to update the offsets, but then that is asynchronous
as well so not really a good solution.

I have no idea why Kafka made this change and also why in the parallel
KafkaRDD application we would be advised to use different consumer groups
for each RDD. That seems strange to me that different consumer groups would
be required or advised. There is no Kafka documentation that I know if that
states this. The biggest issue I see with the parallel KafkaRDD is the
timeouts. I have tried to set poll.ms to 30 seconds and still get the
issue. Something is not right here and just not seem right. As I mentioned
with the streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
this issue. We have been running the same basic logic for over a year now
without one hitch at all.

Ivan


On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <co...@koeninger.org> wrote:

> Someone can correct me, but I'm pretty sure Spark dstreams (in
> general, not just kafka) have been progressing on to the next batch
> after a given batch aborts for quite some time now.  Yet another
> reason I put offsets in my database transactionally.  My jobs throw
> exceptions if the offset in the DB isn't what I expected it to be.
>
>
>
>
> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com> wrote:
> > I've been encountering the same kinds of timeout issues as Ivan, using
> the "Kafka Stream" approach that he is using, except I'm storing my offsets
> manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
> implemented the KafkaRDD approach, and therefore don't have the concurrency
> issues, but a very similar use case is coming up for me soon, it's just
> been backburnered until I can get streaming to be more reliable (I will
> definitely ensure unique group IDs when I do). Offset commits are certainly
> more painful in Kafka 0.10, and that doesn't have anything to do with Spark.
> >
> > While i may be able to alleviate the timeout by just increasing it, I've
> noticed something else that is more worrying: When one task fails 4 times
> in a row (i.e. "Failed to get records for _ after polling for _"), Spark
> aborts the Stage and Job with "Job aborted due to stage failure: Task _ in
> stage _ failed 4 times". That's fine, and it's the behavior I want, but
> instead of stopping the Application there (as previous versions of Spark
> did) the next microbatch marches on and offsets are committed ahead of the
> failed microbatch. Suddenly my at-least-once app becomes more
> sometimes-at-least-once which is no good. In order for spark to display
> that failure, I must be propagating the errors up to Spark, but the
> behavior of marching forward with the next microbatch seems to be new, and
> a big potential for data loss in streaming applications.
> >
> > Am I perhaps missing a setting to stop the entire streaming application
> once spark.task.maxFailures is reached? Has anyone else seen this behavior
> of a streaming application skipping over failed microbatches?
> >
> > Thanks,
> > Sean
> >
> >
> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org> wrote:
> >>
> >> So basically what I am saying is
> >>
> >> - increase poll.ms
> >> - use a separate group id everywhere
> >> - stop committing offsets under the covers
> >>
> >> That should eliminate all of those as possible causes, and then we can
> >> see if there are still issues.
> >>
> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> >> subscribe to a topic in order to update offsets, Kafka does.  If you
> >> don't like the new Kafka consumer api, the existing 0.8 simple
> >> consumer api should be usable with later brokers.  As long as you
> >> don't need SSL or dynamic subscriptions, and it meets your needs, keep
> >> using it.
> >>
> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com> wrote:
> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD
> uses a
> >>> single distinct topic. For example, the group would be something like
> >>> "storage-group", and the topics would be "storage-channel1", and
> >>> "storage-channel2". In each thread a KafkaConsumer is started,
> assigned the
> >>> partitions assigned, and then commit offsets are called after the RDD
> is
> >>> processed. This should not interfere with the consumer group used by
> the
> >>> executors which would be "spark-executor-storage-group".
> >>>
> >>> In the streaming example there is a single topic ("client-events") and
> group
> >>> ("processing-group"). A single stream is created and offsets are
> manually
> >>> updated from the executor after each partition is handled. This was a
> >>> challenge since Spark now requires one to assign or subscribe to a
> topic in
> >>> order to even update the offsets. In 0.8.2.x you did not have to worry
> about
> >>> that. This approach limits your exposure to duplicate data since
> idempotent
> >>> records are not entirely possible in our scenario. At least without a
> lot of
> >>> re-running of logic to de-dup.
> >>>
> >>> Thanks,
> >>>
> >>> Ivan
> >>>
> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
> >>>>
> >>>> So just to be clear, the answers to my questions are
> >>>>
> >>>> - you are not using different group ids, you're using the same group
> >>>> id everywhere
> >>>>
> >>>> - you are committing offsets manually
> >>>>
> >>>> Right?
> >>>>
> >>>> If you want to eliminate network or kafka misbehavior as a source,
> >>>> tune poll.ms upwards even higher.
> >>>>
> >>>> You must use different group ids for different rdds or streams.
> >>>> Kafka consumers won't behave the way you expect if they are all in the
> >>>> same group id, and the consumer cache is keyed by group id. Yes, the
> >>>> executor will tack "spark-executor-" on to the beginning, but if you
> >>>> give it the same base group id, it will be the same.  And the driver
> >>>> will use the group id you gave it, unmodified.
> >>>>
> >>>> Finally, I really can't help you if you're manually writing your own
> >>>> code to commit offsets directly to Kafka.  Trying to minimize
> >>>> duplicates that way doesn't really make sense, your system must be
> >>>> able to handle duplicates if you're using kafka as an offsets store,
> >>>> it can't do transactional exactly once.
> >>>>
> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
> >>>>> Here are some examples and details of the scenarios. The KafkaRDD is
> the
> >>>>> most
> >>>>> error prone to polling
> >>>>> timeouts and concurrentm modification errors.
> >>>>>
> >>>>> *Using KafkaRDD* - This takes a list of channels and processes them
> in
> >>>>> parallel using the KafkaRDD directly. they all use the same consumer
> >>>>> group
> >>>>> ('storage-group'), but each has it's own topic and each topic has 4
> >>>>> partitions. We routinely get timeout errors when polling for data.
> This
> >>>>> occurs whether we process in parallel or sequentially.
> >>>>>
> >>>>> *Spark Kafka setting:*
> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
> >>>>>
> >>>>> *Kafka Consumer Params:*
> >>>>> metric.reporters = []
> >>>>> metadata.max.age.ms = 300000
> >>>>> partition.assignment.strategy =
> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
> >>>>> reconnect.backoff.ms = 50
> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
> >>>>> max.partition.fetch.bytes = 1048576
> >>>>> bootstrap.servers = [somemachine:31000]
> >>>>> ssl.keystore.type = JKS
> >>>>> enable.auto.commit = false
> >>>>> sasl.mechanism = GSSAPI
> >>>>> interceptor.classes = null
> >>>>> exclude.internal.topics = true
> >>>>> ssl.truststore.password = null
> >>>>> client.id =
> >>>>> ssl.endpoint.identification.algorithm = null
> >>>>> max.poll.records = 1000
> >>>>> check.crcs = true
> >>>>> request.timeout.ms = 40000
> >>>>> heartbeat.interval.ms = 3000
> >>>>> auto.commit.interval.ms = 5000
> >>>>> receive.buffer.bytes = 65536
> >>>>> ssl.truststore.type = JKS
> >>>>> ssl.truststore.location = null
> >>>>> ssl.keystore.password = null
> >>>>> fetch.min.bytes = 1
> >>>>> send.buffer.bytes = 131072
> >>>>> value.deserializer = class
> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> >>>>> group.id = storage-group
> >>>>> retry.backoff.ms = 100
> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>>>> sasl.kerberos.service.name = null
> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
> >>>>> ssl.trustmanager.algorithm = PKIX
> >>>>> ssl.key.password = null
> >>>>> fetch.max.wait.ms = 500
> >>>>> sasl.kerberos.min.time.before.relogin = 60000
> >>>>> connections.max.idle.ms = 540000
> >>>>> session.timeout.ms = 30000
> >>>>> metrics.num.samples = 2
> >>>>> key.deserializer = class
> >>>>> org.apache.kafka.common.serialization.StringDeserializer
> >>>>> ssl.protocol = TLS
> >>>>> ssl.provider = null
> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >>>>> ssl.keystore.location = null
> >>>>> ssl.cipher.suites = null
> >>>>> security.protocol = PLAINTEXT
> >>>>> ssl.keymanager.algorithm = SunX509
> >>>>> metrics.sample.window.ms = 30000
> >>>>> auto.offset.reset = earliest
> >>>>>
> >>>>> *Example usage with KafkaRDD:*
> >>>>> val channels = Seq("channel1", "channel2")
> >>>>>
> >>>>> channels.toParArray.foreach { channel =>
> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >>>>>
> >>>>>  // Get offsets for the given topic and the consumer group
> >>>>> 'storage-group'
> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
> >>>>>
> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
> >>>>>        kafkaParams asJava,
> >>>>>        offsetRanges,
> >>>>>        PreferConsistent).toDS[V]
> >>>>>
> >>>>>  // Do some aggregations
> >>>>>  ds.agg(...)
> >>>>>  // Save the data
> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
> >>>>>  // Save offsets using a KafkaConsumer
> >>>>>  consumer.commitSync(newOffsets.asJava)
> >>>>>  consumer.close()
> >>>>> }
> >>>>>
> >>>>>
> >>>>> *Example usage with Kafka Stream:*
> >>>>> This creates a stream and processes events in each partition. At the
> end
> >>>>> of
> >>>>> processing for
> >>>>> each partition, we updated the offsets for each partition. This is
> >>>>> challenging to do, but is better
> >>>>> then calling commitAysnc on the stream, because that occurs after the
> >>>>> /entire/ RDD has been
> >>>>> processed. This method minimizes duplicates in an exactly once
> >>>>> environment.
> >>>>> Since the executors
> >>>>> use their own custom group "spark-executor-processor-group" and the
> >>>>> commit
> >>>>> is buried in private
> >>>>> functions we are unable to use the executors cached consumer to
> update
> >>>>> the
> >>>>> offsets. This requires us
> >>>>> to go through multiple steps to update the Kafka offsets accordingly.
> >>>>>
> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
> >>>>>
> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
> >>>>>      PreferConsistent,
> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
> >>>>>        kafkaParams,
> >>>>>        offsetRanges))
> >>>>>
> >>>>> stream.foreachRDD { rdd =>
> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >>>>>
> >>>>>    // Transform our data
> >>>>>   rdd.foreachPartition { events =>
> >>>>>       // Establish a consumer in the executor so we can update
> offsets
> >>>>> after each partition.
> >>>>>       // This class is homegrown and uses the KafkaConsumer to help
> >>>>> get/set
> >>>>> offsets
> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
> >>>>>       // do something with our data
> >>>>>
> >>>>>       // Write the offsets that were updated in this partition
> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
> endOffset))
> >>>>>   }
> >>>>> }
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> View this message in context:
> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/
> Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
> >>>>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>>>
> >>>>> ------------------------------------------------------------
> ---------
> >>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>>>>
> >>>
> >>>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >>
> >
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
Someone can correct me, but I'm pretty sure Spark dstreams (in
general, not just kafka) have been progressing on to the next batch
after a given batch aborts for quite some time now.  Yet another
reason I put offsets in my database transactionally.  My jobs throw
exceptions if the offset in the DB isn't what I expected it to be.




On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <gr...@graphex.com> wrote:
> I've been encountering the same kinds of timeout issues as Ivan, using the "Kafka Stream" approach that he is using, except I'm storing my offsets manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet implemented the KafkaRDD approach, and therefore don't have the concurrency issues, but a very similar use case is coming up for me soon, it's just been backburnered until I can get streaming to be more reliable (I will definitely ensure unique group IDs when I do). Offset commits are certainly more painful in Kafka 0.10, and that doesn't have anything to do with Spark.
>
> While i may be able to alleviate the timeout by just increasing it, I've noticed something else that is more worrying: When one task fails 4 times in a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts the Stage and Job with "Job aborted due to stage failure: Task _ in stage _ failed 4 times". That's fine, and it's the behavior I want, but instead of stopping the Application there (as previous versions of Spark did) the next microbatch marches on and offsets are committed ahead of the failed microbatch. Suddenly my at-least-once app becomes more sometimes-at-least-once which is no good. In order for spark to display that failure, I must be propagating the errors up to Spark, but the behavior of marching forward with the next microbatch seems to be new, and a big potential for data loss in streaming applications.
>
> Am I perhaps missing a setting to stop the entire streaming application once spark.task.maxFailures is reached? Has anyone else seen this behavior of a streaming application skipping over failed microbatches?
>
> Thanks,
> Sean
>
>
>> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> So basically what I am saying is
>>
>> - increase poll.ms
>> - use a separate group id everywhere
>> - stop committing offsets under the covers
>>
>> That should eliminate all of those as possible causes, and then we can
>> see if there are still issues.
>>
>> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> subscribe to a topic in order to update offsets, Kafka does.  If you
>> don't like the new Kafka consumer api, the existing 0.8 simple
>> consumer api should be usable with later brokers.  As long as you
>> don't need SSL or dynamic subscriptions, and it meets your needs, keep
>> using it.
>>
>> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
>>> single distinct topic. For example, the group would be something like
>>> "storage-group", and the topics would be "storage-channel1", and
>>> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
>>> partitions assigned, and then commit offsets are called after the RDD is
>>> processed. This should not interfere with the consumer group used by the
>>> executors which would be "spark-executor-storage-group".
>>>
>>> In the streaming example there is a single topic ("client-events") and group
>>> ("processing-group"). A single stream is created and offsets are manually
>>> updated from the executor after each partition is handled. This was a
>>> challenge since Spark now requires one to assign or subscribe to a topic in
>>> order to even update the offsets. In 0.8.2.x you did not have to worry about
>>> that. This approach limits your exposure to duplicate data since idempotent
>>> records are not entirely possible in our scenario. At least without a lot of
>>> re-running of logic to de-dup.
>>>
>>> Thanks,
>>>
>>> Ivan
>>>
>>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>>
>>>> So just to be clear, the answers to my questions are
>>>>
>>>> - you are not using different group ids, you're using the same group
>>>> id everywhere
>>>>
>>>> - you are committing offsets manually
>>>>
>>>> Right?
>>>>
>>>> If you want to eliminate network or kafka misbehavior as a source,
>>>> tune poll.ms upwards even higher.
>>>>
>>>> You must use different group ids for different rdds or streams.
>>>> Kafka consumers won't behave the way you expect if they are all in the
>>>> same group id, and the consumer cache is keyed by group id. Yes, the
>>>> executor will tack "spark-executor-" on to the beginning, but if you
>>>> give it the same base group id, it will be the same.  And the driver
>>>> will use the group id you gave it, unmodified.
>>>>
>>>> Finally, I really can't help you if you're manually writing your own
>>>> code to commit offsets directly to Kafka.  Trying to minimize
>>>> duplicates that way doesn't really make sense, your system must be
>>>> able to handle duplicates if you're using kafka as an offsets store,
>>>> it can't do transactional exactly once.
>>>>
>>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
>>>>> Here are some examples and details of the scenarios. The KafkaRDD is the
>>>>> most
>>>>> error prone to polling
>>>>> timeouts and concurrentm modification errors.
>>>>>
>>>>> *Using KafkaRDD* - This takes a list of channels and processes them in
>>>>> parallel using the KafkaRDD directly. they all use the same consumer
>>>>> group
>>>>> ('storage-group'), but each has it's own topic and each topic has 4
>>>>> partitions. We routinely get timeout errors when polling for data. This
>>>>> occurs whether we process in parallel or sequentially.
>>>>>
>>>>> *Spark Kafka setting:*
>>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>>>>
>>>>> *Kafka Consumer Params:*
>>>>> metric.reporters = []
>>>>> metadata.max.age.ms = 300000
>>>>> partition.assignment.strategy =
>>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>>> reconnect.backoff.ms = 50
>>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>>> max.partition.fetch.bytes = 1048576
>>>>> bootstrap.servers = [somemachine:31000]
>>>>> ssl.keystore.type = JKS
>>>>> enable.auto.commit = false
>>>>> sasl.mechanism = GSSAPI
>>>>> interceptor.classes = null
>>>>> exclude.internal.topics = true
>>>>> ssl.truststore.password = null
>>>>> client.id =
>>>>> ssl.endpoint.identification.algorithm = null
>>>>> max.poll.records = 1000
>>>>> check.crcs = true
>>>>> request.timeout.ms = 40000
>>>>> heartbeat.interval.ms = 3000
>>>>> auto.commit.interval.ms = 5000
>>>>> receive.buffer.bytes = 65536
>>>>> ssl.truststore.type = JKS
>>>>> ssl.truststore.location = null
>>>>> ssl.keystore.password = null
>>>>> fetch.min.bytes = 1
>>>>> send.buffer.bytes = 131072
>>>>> value.deserializer = class
>>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>>>>> group.id = storage-group
>>>>> retry.backoff.ms = 100
>>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>>> sasl.kerberos.service.name = null
>>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>>> ssl.trustmanager.algorithm = PKIX
>>>>> ssl.key.password = null
>>>>> fetch.max.wait.ms = 500
>>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>>> connections.max.idle.ms = 540000
>>>>> session.timeout.ms = 30000
>>>>> metrics.num.samples = 2
>>>>> key.deserializer = class
>>>>> org.apache.kafka.common.serialization.StringDeserializer
>>>>> ssl.protocol = TLS
>>>>> ssl.provider = null
>>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>>> ssl.keystore.location = null
>>>>> ssl.cipher.suites = null
>>>>> security.protocol = PLAINTEXT
>>>>> ssl.keymanager.algorithm = SunX509
>>>>> metrics.sample.window.ms = 30000
>>>>> auto.offset.reset = earliest
>>>>>
>>>>> *Example usage with KafkaRDD:*
>>>>> val channels = Seq("channel1", "channel2")
>>>>>
>>>>> channels.toParArray.foreach { channel =>
>>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>>
>>>>>  // Get offsets for the given topic and the consumer group
>>>>> 'storage-group'
>>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>>>>
>>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>>>>        kafkaParams asJava,
>>>>>        offsetRanges,
>>>>>        PreferConsistent).toDS[V]
>>>>>
>>>>>  // Do some aggregations
>>>>>  ds.agg(...)
>>>>>  // Save the data
>>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>>  // Save offsets using a KafkaConsumer
>>>>>  consumer.commitSync(newOffsets.asJava)
>>>>>  consumer.close()
>>>>> }
>>>>>
>>>>>
>>>>> *Example usage with Kafka Stream:*
>>>>> This creates a stream and processes events in each partition. At the end
>>>>> of
>>>>> processing for
>>>>> each partition, we updated the offsets for each partition. This is
>>>>> challenging to do, but is better
>>>>> then calling commitAysnc on the stream, because that occurs after the
>>>>> /entire/ RDD has been
>>>>> processed. This method minimizes duplicates in an exactly once
>>>>> environment.
>>>>> Since the executors
>>>>> use their own custom group "spark-executor-processor-group" and the
>>>>> commit
>>>>> is buried in private
>>>>> functions we are unable to use the executors cached consumer to update
>>>>> the
>>>>> offsets. This requires us
>>>>> to go through multiple steps to update the Kafka offsets accordingly.
>>>>>
>>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>>>>
>>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>>>>      PreferConsistent,
>>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>>>>        kafkaParams,
>>>>>        offsetRanges))
>>>>>
>>>>> stream.foreachRDD { rdd =>
>>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>
>>>>>    // Transform our data
>>>>>   rdd.foreachPartition { events =>
>>>>>       // Establish a consumer in the executor so we can update offsets
>>>>> after each partition.
>>>>>       // This class is homegrown and uses the KafkaConsumer to help
>>>>> get/set
>>>>> offsets
>>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>>>>       // do something with our data
>>>>>
>>>>>       // Write the offsets that were updated in this partition
>>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>>>>          Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>
>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Sean McKibben <gr...@graphex.com>.
I've been encountering the same kinds of timeout issues as Ivan, using the "Kafka Stream" approach that he is using, except I'm storing my offsets manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet implemented the KafkaRDD approach, and therefore don't have the concurrency issues, but a very similar use case is coming up for me soon, it's just been backburnered until I can get streaming to be more reliable (I will definitely ensure unique group IDs when I do). Offset commits are certainly more painful in Kafka 0.10, and that doesn't have anything to do with Spark.

While i may be able to alleviate the timeout by just increasing it, I've noticed something else that is more worrying: When one task fails 4 times in a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts the Stage and Job with "Job aborted due to stage failure: Task _ in stage _ failed 4 times". That's fine, and it's the behavior I want, but instead of stopping the Application there (as previous versions of Spark did) the next microbatch marches on and offsets are committed ahead of the failed microbatch. Suddenly my at-least-once app becomes more sometimes-at-least-once which is no good. In order for spark to display that failure, I must be propagating the errors up to Spark, but the behavior of marching forward with the next microbatch seems to be new, and a big potential for data loss in streaming applications.

Am I perhaps missing a setting to stop the entire streaming application once spark.task.maxFailures is reached? Has anyone else seen this behavior of a streaming application skipping over failed microbatches?

Thanks,
Sean


> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <co...@koeninger.org> wrote:
> 
> So basically what I am saying is
> 
> - increase poll.ms
> - use a separate group id everywhere
> - stop committing offsets under the covers
> 
> That should eliminate all of those as possible causes, and then we can
> see if there are still issues.
> 
> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> subscribe to a topic in order to update offsets, Kafka does.  If you
> don't like the new Kafka consumer api, the existing 0.8 simple
> consumer api should be usable with later brokers.  As long as you
> don't need SSL or dynamic subscriptions, and it meets your needs, keep
> using it.
> 
> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com> wrote:
>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
>> single distinct topic. For example, the group would be something like
>> "storage-group", and the topics would be "storage-channel1", and
>> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
>> partitions assigned, and then commit offsets are called after the RDD is
>> processed. This should not interfere with the consumer group used by the
>> executors which would be "spark-executor-storage-group".
>> 
>> In the streaming example there is a single topic ("client-events") and group
>> ("processing-group"). A single stream is created and offsets are manually
>> updated from the executor after each partition is handled. This was a
>> challenge since Spark now requires one to assign or subscribe to a topic in
>> order to even update the offsets. In 0.8.2.x you did not have to worry about
>> that. This approach limits your exposure to duplicate data since idempotent
>> records are not entirely possible in our scenario. At least without a lot of
>> re-running of logic to de-dup.
>> 
>> Thanks,
>> 
>> Ivan
>> 
>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>> 
>>> So just to be clear, the answers to my questions are
>>> 
>>> - you are not using different group ids, you're using the same group
>>> id everywhere
>>> 
>>> - you are committing offsets manually
>>> 
>>> Right?
>>> 
>>> If you want to eliminate network or kafka misbehavior as a source,
>>> tune poll.ms upwards even higher.
>>> 
>>> You must use different group ids for different rdds or streams.
>>> Kafka consumers won't behave the way you expect if they are all in the
>>> same group id, and the consumer cache is keyed by group id. Yes, the
>>> executor will tack "spark-executor-" on to the beginning, but if you
>>> give it the same base group id, it will be the same.  And the driver
>>> will use the group id you gave it, unmodified.
>>> 
>>> Finally, I really can't help you if you're manually writing your own
>>> code to commit offsets directly to Kafka.  Trying to minimize
>>> duplicates that way doesn't really make sense, your system must be
>>> able to handle duplicates if you're using kafka as an offsets store,
>>> it can't do transactional exactly once.
>>> 
>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
>>>> Here are some examples and details of the scenarios. The KafkaRDD is the
>>>> most
>>>> error prone to polling
>>>> timeouts and concurrentm modification errors.
>>>> 
>>>> *Using KafkaRDD* - This takes a list of channels and processes them in
>>>> parallel using the KafkaRDD directly. they all use the same consumer
>>>> group
>>>> ('storage-group'), but each has it's own topic and each topic has 4
>>>> partitions. We routinely get timeout errors when polling for data. This
>>>> occurs whether we process in parallel or sequentially.
>>>> 
>>>> *Spark Kafka setting:*
>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>>> 
>>>> *Kafka Consumer Params:*
>>>> metric.reporters = []
>>>> metadata.max.age.ms = 300000
>>>> partition.assignment.strategy =
>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>> reconnect.backoff.ms = 50
>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> max.partition.fetch.bytes = 1048576
>>>> bootstrap.servers = [somemachine:31000]
>>>> ssl.keystore.type = JKS
>>>> enable.auto.commit = false
>>>> sasl.mechanism = GSSAPI
>>>> interceptor.classes = null
>>>> exclude.internal.topics = true
>>>> ssl.truststore.password = null
>>>> client.id =
>>>> ssl.endpoint.identification.algorithm = null
>>>> max.poll.records = 1000
>>>> check.crcs = true
>>>> request.timeout.ms = 40000
>>>> heartbeat.interval.ms = 3000
>>>> auto.commit.interval.ms = 5000
>>>> receive.buffer.bytes = 65536
>>>> ssl.truststore.type = JKS
>>>> ssl.truststore.location = null
>>>> ssl.keystore.password = null
>>>> fetch.min.bytes = 1
>>>> send.buffer.bytes = 131072
>>>> value.deserializer = class
>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>>>> group.id = storage-group
>>>> retry.backoff.ms = 100
>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> sasl.kerberos.service.name = null
>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>> ssl.trustmanager.algorithm = PKIX
>>>> ssl.key.password = null
>>>> fetch.max.wait.ms = 500
>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>> connections.max.idle.ms = 540000
>>>> session.timeout.ms = 30000
>>>> metrics.num.samples = 2
>>>> key.deserializer = class
>>>> org.apache.kafka.common.serialization.StringDeserializer
>>>> ssl.protocol = TLS
>>>> ssl.provider = null
>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> ssl.keystore.location = null
>>>> ssl.cipher.suites = null
>>>> security.protocol = PLAINTEXT
>>>> ssl.keymanager.algorithm = SunX509
>>>> metrics.sample.window.ms = 30000
>>>> auto.offset.reset = earliest
>>>> 
>>>> *Example usage with KafkaRDD:*
>>>> val channels = Seq("channel1", "channel2")
>>>> 
>>>> channels.toParArray.foreach { channel =>
>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>> 
>>>>  // Get offsets for the given topic and the consumer group
>>>> 'storage-group'
>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>>> 
>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>>>        kafkaParams asJava,
>>>>        offsetRanges,
>>>>        PreferConsistent).toDS[V]
>>>> 
>>>>  // Do some aggregations
>>>>  ds.agg(...)
>>>>  // Save the data
>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>  // Save offsets using a KafkaConsumer
>>>>  consumer.commitSync(newOffsets.asJava)
>>>>  consumer.close()
>>>> }
>>>> 
>>>> 
>>>> *Example usage with Kafka Stream:*
>>>> This creates a stream and processes events in each partition. At the end
>>>> of
>>>> processing for
>>>> each partition, we updated the offsets for each partition. This is
>>>> challenging to do, but is better
>>>> then calling commitAysnc on the stream, because that occurs after the
>>>> /entire/ RDD has been
>>>> processed. This method minimizes duplicates in an exactly once
>>>> environment.
>>>> Since the executors
>>>> use their own custom group "spark-executor-processor-group" and the
>>>> commit
>>>> is buried in private
>>>> functions we are unable to use the executors cached consumer to update
>>>> the
>>>> offsets. This requires us
>>>> to go through multiple steps to update the Kafka offsets accordingly.
>>>> 
>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>>> 
>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>>>      PreferConsistent,
>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>>>        kafkaParams,
>>>>        offsetRanges))
>>>> 
>>>> stream.foreachRDD { rdd =>
>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>> 
>>>>    // Transform our data
>>>>   rdd.foreachPartition { events =>
>>>>       // Establish a consumer in the executor so we can update offsets
>>>> after each partition.
>>>>       // This class is homegrown and uses the KafkaConsumer to help
>>>> get/set
>>>> offsets
>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>>>       // do something with our data
>>>> 
>>>>       // Write the offsets that were updated in this partition
>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>>>          Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
>>>>   }
>>>> }
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>> 
>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
So basically what I am saying is

- increase poll.ms
- use a separate group id everywhere
- stop committing offsets under the covers

That should eliminate all of those as possible causes, and then we can
see if there are still issues.

As far as 0.8 vs 0.10, Spark doesn't require you to assign or
subscribe to a topic in order to update offsets, Kafka does.  If you
don't like the new Kafka consumer api, the existing 0.8 simple
consumer api should be usable with later brokers.  As long as you
don't need SSL or dynamic subscriptions, and it meets your needs, keep
using it.

On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <iv...@vadio.com> wrote:
> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
> single distinct topic. For example, the group would be something like
> "storage-group", and the topics would be "storage-channel1", and
> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
> partitions assigned, and then commit offsets are called after the RDD is
> processed. This should not interfere with the consumer group used by the
> executors which would be "spark-executor-storage-group".
>
> In the streaming example there is a single topic ("client-events") and group
> ("processing-group"). A single stream is created and offsets are manually
> updated from the executor after each partition is handled. This was a
> challenge since Spark now requires one to assign or subscribe to a topic in
> order to even update the offsets. In 0.8.2.x you did not have to worry about
> that. This approach limits your exposure to duplicate data since idempotent
> records are not entirely possible in our scenario. At least without a lot of
> re-running of logic to de-dup.
>
> Thanks,
>
> Ivan
>
> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> So just to be clear, the answers to my questions are
>>
>> - you are not using different group ids, you're using the same group
>> id everywhere
>>
>> - you are committing offsets manually
>>
>> Right?
>>
>> If you want to eliminate network or kafka misbehavior as a source,
>> tune poll.ms upwards even higher.
>>
>> You must use different group ids for different rdds or streams.
>> Kafka consumers won't behave the way you expect if they are all in the
>> same group id, and the consumer cache is keyed by group id. Yes, the
>> executor will tack "spark-executor-" on to the beginning, but if you
>> give it the same base group id, it will be the same.  And the driver
>> will use the group id you gave it, unmodified.
>>
>> Finally, I really can't help you if you're manually writing your own
>> code to commit offsets directly to Kafka.  Trying to minimize
>> duplicates that way doesn't really make sense, your system must be
>> able to handle duplicates if you're using kafka as an offsets store,
>> it can't do transactional exactly once.
>>
>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
>> > Here are some examples and details of the scenarios. The KafkaRDD is the
>> > most
>> > error prone to polling
>> > timeouts and concurrentm modification errors.
>> >
>> > *Using KafkaRDD* - This takes a list of channels and processes them in
>> > parallel using the KafkaRDD directly. they all use the same consumer
>> > group
>> > ('storage-group'), but each has it's own topic and each topic has 4
>> > partitions. We routinely get timeout errors when polling for data. This
>> > occurs whether we process in parallel or sequentially.
>> >
>> > *Spark Kafka setting:*
>> > spark.streaming.kafka.consumer.poll.ms=2000
>> >
>> > *Kafka Consumer Params:*
>> > metric.reporters = []
>> > metadata.max.age.ms = 300000
>> > partition.assignment.strategy =
>> > [org.apache.kafka.clients.consumer.RangeAssignor]
>> > reconnect.backoff.ms = 50
>> > sasl.kerberos.ticket.renew.window.factor = 0.8
>> > max.partition.fetch.bytes = 1048576
>> > bootstrap.servers = [somemachine:31000]
>> > ssl.keystore.type = JKS
>> > enable.auto.commit = false
>> > sasl.mechanism = GSSAPI
>> > interceptor.classes = null
>> > exclude.internal.topics = true
>> > ssl.truststore.password = null
>> > client.id =
>> > ssl.endpoint.identification.algorithm = null
>> > max.poll.records = 1000
>> > check.crcs = true
>> > request.timeout.ms = 40000
>> > heartbeat.interval.ms = 3000
>> > auto.commit.interval.ms = 5000
>> > receive.buffer.bytes = 65536
>> > ssl.truststore.type = JKS
>> > ssl.truststore.location = null
>> > ssl.keystore.password = null
>> > fetch.min.bytes = 1
>> > send.buffer.bytes = 131072
>> > value.deserializer = class
>> > com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>> > group.id = storage-group
>> > retry.backoff.ms = 100
>> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> > sasl.kerberos.service.name = null
>> > sasl.kerberos.ticket.renew.jitter = 0.05
>> > ssl.trustmanager.algorithm = PKIX
>> > ssl.key.password = null
>> > fetch.max.wait.ms = 500
>> > sasl.kerberos.min.time.before.relogin = 60000
>> > connections.max.idle.ms = 540000
>> > session.timeout.ms = 30000
>> > metrics.num.samples = 2
>> > key.deserializer = class
>> > org.apache.kafka.common.serialization.StringDeserializer
>> > ssl.protocol = TLS
>> > ssl.provider = null
>> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> > ssl.keystore.location = null
>> > ssl.cipher.suites = null
>> > security.protocol = PLAINTEXT
>> > ssl.keymanager.algorithm = SunX509
>> > metrics.sample.window.ms = 30000
>> > auto.offset.reset = earliest
>> >
>> > *Example usage with KafkaRDD:*
>> > val channels = Seq("channel1", "channel2")
>> >
>> > channels.toParArray.foreach { channel =>
>> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >
>> >   // Get offsets for the given topic and the consumer group
>> > 'storage-group'
>> >   val offsetRanges = getOffsets("storage-group", channel)
>> >
>> >   val ds = KafkaUtils.createRDD[K, V](context,
>> >         kafkaParams asJava,
>> >         offsetRanges,
>> >         PreferConsistent).toDS[V]
>> >
>> >   // Do some aggregations
>> >   ds.agg(...)
>> >   // Save the data
>> >   ds.write.mode(SaveMode.Append).parquet(somePath)
>> >   // Save offsets using a KafkaConsumer
>> >   consumer.commitSync(newOffsets.asJava)
>> >   consumer.close()
>> > }
>> >
>> >
>> > *Example usage with Kafka Stream:*
>> > This creates a stream and processes events in each partition. At the end
>> > of
>> > processing for
>> > each partition, we updated the offsets for each partition. This is
>> > challenging to do, but is better
>> > then calling commitAysnc on the stream, because that occurs after the
>> > /entire/ RDD has been
>> > processed. This method minimizes duplicates in an exactly once
>> > environment.
>> > Since the executors
>> > use their own custom group "spark-executor-processor-group" and the
>> > commit
>> > is buried in private
>> > functions we are unable to use the executors cached consumer to update
>> > the
>> > offsets. This requires us
>> > to go through multiple steps to update the Kafka offsets accordingly.
>> >
>> > val offsetRanges = getOffsets("processor-group", "my-topic")
>> >
>> > val stream = KafkaUtils.createDirectStream[K, V](context,
>> >       PreferConsistent,
>> >       Subscribe[K, V](Seq("my-topic") asJavaCollection,
>> >         kafkaParams,
>> >         offsetRanges))
>> >
>> > stream.foreachRDD { rdd =>
>> >     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >
>> >     // Transform our data
>> >    rdd.foreachPartition { events =>
>> >        // Establish a consumer in the executor so we can update offsets
>> > after each partition.
>> >        // This class is homegrown and uses the KafkaConsumer to help
>> > get/set
>> > offsets
>> >        val consumer = new ConsumerUtils(kafkaParams)
>> >        // do something with our data
>> >
>> >        // Write the offsets that were updated in this partition
>> >        kafkaConsumer.setConsumerOffsets("processor-group",
>> >           Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
>> >    }
>> > }
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Ivan von Nagy <iv...@vadio.com>.
Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses
a single distinct topic. For example, the group would be something like
"storage-group", and the topics would be "storage-channel1", and
"storage-channel2". In each thread a KafkaConsumer is started, assigned the
partitions assigned, and then commit offsets are called after the RDD is
processed. This should not interfere with the consumer group used by the
executors which would be "spark-executor-storage-group".

In the streaming example there is a single topic ("client-events") and
group ("processing-group"). A single stream is created and offsets are
manually updated from the executor after each partition is handled. This
was a challenge since Spark now requires one to assign or subscribe to a
topic in order to even update the offsets. In 0.8.2.x you did not have to
worry about that. This approach limits your exposure to duplicate data
since idempotent records are not entirely possible in our scenario. At
least without a lot of re-running of logic to de-dup.

Thanks,

Ivan

On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <co...@koeninger.org> wrote:

> So just to be clear, the answers to my questions are
>
> - you are not using different group ids, you're using the same group
> id everywhere
>
> - you are committing offsets manually
>
> Right?
>
> If you want to eliminate network or kafka misbehavior as a source,
> tune poll.ms upwards even higher.
>
> You must use different group ids for different rdds or streams.
> Kafka consumers won't behave the way you expect if they are all in the
> same group id, and the consumer cache is keyed by group id. Yes, the
> executor will tack "spark-executor-" on to the beginning, but if you
> give it the same base group id, it will be the same.  And the driver
> will use the group id you gave it, unmodified.
>
> Finally, I really can't help you if you're manually writing your own
> code to commit offsets directly to Kafka.  Trying to minimize
> duplicates that way doesn't really make sense, your system must be
> able to handle duplicates if you're using kafka as an offsets store,
> it can't do transactional exactly once.
>
> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
> > Here are some examples and details of the scenarios. The KafkaRDD is the
> most
> > error prone to polling
> > timeouts and concurrentm modification errors.
> >
> > *Using KafkaRDD* - This takes a list of channels and processes them in
> > parallel using the KafkaRDD directly. they all use the same consumer
> group
> > ('storage-group'), but each has it's own topic and each topic has 4
> > partitions. We routinely get timeout errors when polling for data. This
> > occurs whether we process in parallel or sequentially.
> >
> > *Spark Kafka setting:*
> > spark.streaming.kafka.consumer.poll.ms=2000
> >
> > *Kafka Consumer Params:*
> > metric.reporters = []
> > metadata.max.age.ms = 300000
> > partition.assignment.strategy =
> > [org.apache.kafka.clients.consumer.RangeAssignor]
> > reconnect.backoff.ms = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > max.partition.fetch.bytes = 1048576
> > bootstrap.servers = [somemachine:31000]
> > ssl.keystore.type = JKS
> > enable.auto.commit = false
> > sasl.mechanism = GSSAPI
> > interceptor.classes = null
> > exclude.internal.topics = true
> > ssl.truststore.password = null
> > client.id =
> > ssl.endpoint.identification.algorithm = null
> > max.poll.records = 1000
> > check.crcs = true
> > request.timeout.ms = 40000
> > heartbeat.interval.ms = 3000
> > auto.commit.interval.ms = 5000
> > receive.buffer.bytes = 65536
> > ssl.truststore.type = JKS
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > fetch.min.bytes = 1
> > send.buffer.bytes = 131072
> > value.deserializer = class
> > com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> > group.id = storage-group
> > retry.backoff.ms = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.service.name = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.trustmanager.algorithm = PKIX
> > ssl.key.password = null
> > fetch.max.wait.ms = 500
> > sasl.kerberos.min.time.before.relogin = 60000
> > connections.max.idle.ms = 540000
> > session.timeout.ms = 30000
> > metrics.num.samples = 2
> > key.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > ssl.protocol = TLS
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.keystore.location = null
> > ssl.cipher.suites = null
> > security.protocol = PLAINTEXT
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms = 30000
> > auto.offset.reset = earliest
> >
> > *Example usage with KafkaRDD:*
> > val channels = Seq("channel1", "channel2")
> >
> > channels.toParArray.foreach { channel =>
> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >
> >   // Get offsets for the given topic and the consumer group
> 'storage-group'
> >   val offsetRanges = getOffsets("storage-group", channel)
> >
> >   val ds = KafkaUtils.createRDD[K, V](context,
> >         kafkaParams asJava,
> >         offsetRanges,
> >         PreferConsistent).toDS[V]
> >
> >   // Do some aggregations
> >   ds.agg(...)
> >   // Save the data
> >   ds.write.mode(SaveMode.Append).parquet(somePath)
> >   // Save offsets using a KafkaConsumer
> >   consumer.commitSync(newOffsets.asJava)
> >   consumer.close()
> > }
> >
> >
> > *Example usage with Kafka Stream:*
> > This creates a stream and processes events in each partition. At the end
> of
> > processing for
> > each partition, we updated the offsets for each partition. This is
> > challenging to do, but is better
> > then calling commitAysnc on the stream, because that occurs after the
> > /entire/ RDD has been
> > processed. This method minimizes duplicates in an exactly once
> environment.
> > Since the executors
> > use their own custom group "spark-executor-processor-group" and the
> commit
> > is buried in private
> > functions we are unable to use the executors cached consumer to update
> the
> > offsets. This requires us
> > to go through multiple steps to update the Kafka offsets accordingly.
> >
> > val offsetRanges = getOffsets("processor-group", "my-topic")
> >
> > val stream = KafkaUtils.createDirectStream[K, V](context,
> >       PreferConsistent,
> >       Subscribe[K, V](Seq("my-topic") asJavaCollection,
> >         kafkaParams,
> >         offsetRanges))
> >
> > stream.foreachRDD { rdd =>
> >     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >
> >     // Transform our data
> >    rdd.foreachPartition { events =>
> >        // Establish a consumer in the executor so we can update offsets
> > after each partition.
> >        // This class is homegrown and uses the KafkaConsumer to help
> get/set
> > offsets
> >        val consumer = new ConsumerUtils(kafkaParams)
> >        // do something with our data
> >
> >        // Write the offsets that were updated in this partition
> >        kafkaConsumer.setConsumerOffsets("processor-group",
> >           Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
> >    }
> > }
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-
> tp28017p28020.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >
>

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
So just to be clear, the answers to my questions are

- you are not using different group ids, you're using the same group
id everywhere

- you are committing offsets manually

Right?

If you want to eliminate network or kafka misbehavior as a source,
tune poll.ms upwards even higher.

You must use different group ids for different rdds or streams.
Kafka consumers won't behave the way you expect if they are all in the
same group id, and the consumer cache is keyed by group id. Yes, the
executor will tack "spark-executor-" on to the beginning, but if you
give it the same base group id, it will be the same.  And the driver
will use the group id you gave it, unmodified.

Finally, I really can't help you if you're manually writing your own
code to commit offsets directly to Kafka.  Trying to minimize
duplicates that way doesn't really make sense, your system must be
able to handle duplicates if you're using kafka as an offsets store,
it can't do transactional exactly once.

On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <iv...@vadio.com> wrote:
> Here are some examples and details of the scenarios. The KafkaRDD is the most
> error prone to polling
> timeouts and concurrentm modification errors.
>
> *Using KafkaRDD* - This takes a list of channels and processes them in
> parallel using the KafkaRDD directly. they all use the same consumer group
> ('storage-group'), but each has it's own topic and each topic has 4
> partitions. We routinely get timeout errors when polling for data. This
> occurs whether we process in parallel or sequentially.
>
> *Spark Kafka setting:*
> spark.streaming.kafka.consumer.poll.ms=2000
>
> *Kafka Consumer Params:*
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [somemachine:31000]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id =
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 1000
> check.crcs = true
> request.timeout.ms = 40000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> group.id = storage-group
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 30000
> metrics.num.samples = 2
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = earliest
>
> *Example usage with KafkaRDD:*
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group 'storage-group'
>   val offsetRanges = getOffsets("storage-group", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
>         kafkaParams asJava,
>         offsetRanges,
>         PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
>
> *Example usage with Kafka Stream:*
> This creates a stream and processes events in each partition. At the end of
> processing for
> each partition, we updated the offsets for each partition. This is
> challenging to do, but is better
> then calling commitAysnc on the stream, because that occurs after the
> /entire/ RDD has been
> processed. This method minimizes duplicates in an exactly once environment.
> Since the executors
> use their own custom group "spark-executor-processor-group" and the commit
> is buried in private
> functions we are unable to use the executors cached consumer to update the
> offsets. This requires us
> to go through multiple steps to update the Kafka offsets accordingly.
>
> val offsetRanges = getOffsets("processor-group", "my-topic")
>
> val stream = KafkaUtils.createDirectStream[K, V](context,
>       PreferConsistent,
>       Subscribe[K, V](Seq("my-topic") asJavaCollection,
>         kafkaParams,
>         offsetRanges))
>
> stream.foreachRDD { rdd =>
>     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>     // Transform our data
>    rdd.foreachPartition { events =>
>        // Establish a consumer in the executor so we can update offsets
> after each partition.
>        // This class is homegrown and uses the KafkaConsumer to help get/set
> offsets
>        val consumer = new ConsumerUtils(kafkaParams)
>        // do something with our data
>
>        // Write the offsets that were updated in this partition
>        kafkaConsumer.setConsumerOffsets("processor-group",
>           Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
>    }
> }
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by vonnagy <iv...@vadio.com>.
Here are some examples and details of the scenarios. The KafkaRDD is the most
error prone to polling 
timeouts and concurrentm modification errors.

*Using KafkaRDD* - This takes a list of channels and processes them in
parallel using the KafkaRDD directly. they all use the same consumer group
('storage-group'), but each has it's own topic and each topic has 4
partitions. We routinely get timeout errors when polling for data. This
occurs whether we process in parallel or sequentially. 

*Spark Kafka setting:*
spark.streaming.kafka.consumer.poll.ms=2000

*Kafka Consumer Params:*
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [somemachine:31000]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = 
ssl.endpoint.identification.algorithm = null
max.poll.records = 1000
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class
com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
group.id = storage-group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

*Example usage with KafkaRDD:*
val channels = Seq("channel1", "channel2")

channels.toParArray.foreach { channel =>
  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)

  // Get offsets for the given topic and the consumer group 'storage-group'
  val offsetRanges = getOffsets("storage-group", channel)

  val ds = KafkaUtils.createRDD[K, V](context,
        kafkaParams asJava,
        offsetRanges,
        PreferConsistent).toDS[V]

  // Do some aggregations
  ds.agg(...)
  // Save the data
  ds.write.mode(SaveMode.Append).parquet(somePath)
  // Save offsets using a KafkaConsumer
  consumer.commitSync(newOffsets.asJava)
  consumer.close()
}


*Example usage with Kafka Stream:*
This creates a stream and processes events in each partition. At the end of
processing for
each partition, we updated the offsets for each partition. This is
challenging to do, but is better
then calling commitAysnc on the stream, because that occurs after the
/entire/ RDD has been 
processed. This method minimizes duplicates in an exactly once environment.
Since the executors 
use their own custom group "spark-executor-processor-group" and the commit
is buried in private 
functions we are unable to use the executors cached consumer to update the
offsets. This requires us
to go through multiple steps to update the Kafka offsets accordingly.

val offsetRanges = getOffsets("processor-group", "my-topic")

val stream = KafkaUtils.createDirectStream[K, V](context,
      PreferConsistent,
      Subscribe[K, V](Seq("my-topic") asJavaCollection,
        kafkaParams,
        offsetRanges))

stream.foreachRDD { rdd =>
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    // Transform our data
   rdd.foreachPartition { events =>
       // Establish a consumer in the executor so we can update offsets
after each partition.
       // This class is homegrown and uses the KafkaConsumer to help get/set
offsets
       val consumer = new ConsumerUtils(kafkaParams)
       // do something with our data
       
       // Write the offsets that were updated in this partition 
       kafkaConsumer.setConsumerOffsets("processor-group",
          Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
   }
}



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

Posted by Cody Koeninger <co...@koeninger.org>.
- are you using different group ids for the different streams?
- are you manually committing offsets?
- what are the values of your kafka-related settings?

On Fri, Nov 4, 2016 at 12:20 PM, vonnagy <iv...@vadio.com> wrote:
> I am getting the issues using Spark 2.0.1 and Kafka 0.10. I have two jobs,
> one that uses a Kafka stream and one that uses just the KafkaRDD.
>
> With the KafkaRDD, I continually get the "Failed to get records .. after
> polling". I have adjusted the polling with
> `spark.streaming.kafka.consumer.poll.ms` and the size of records with
> Kafka's `max.poll.records`. Even when it gets records it is extremely slow.
>
> When working with multiple KafkaRDDs in parallel I get the dreaded
> `ConcurrentModificationException`. The Spark logic is supposed to use a
> CachedKafkaConsumer based on the topic and partition. This is supposed to
> guarantee thread safety, but I continually get this error along with the
> polling timeout.
>
> Has anyone else tried to use Spark 2 with Kafka 0.10 and had any success. At
> this point it is completely useless in my experience. With Spark 1.6 and
> Kafka 0.8.x, I never had these problems.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org