You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cheng Yi <ph...@gmail.com> on 2016/09/08 21:44:08 UTC

spark streaming kafka connector questions

I am using the lastest streaming kafka connector
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>

I am facing the problem that a message is delivered two times to my
consumers. these two deliveries are 10+ seconds apart, it looks this is
caused by my lengthy message processing (took about 60 seconds), then I
tried to solve this, but I am still stuck.

1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
but not v.10

JavaPairInputDStream<String, String> ds = KafkaUtils.createDirectStream(jsc, 
					String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);

2. after i got the message from the kafka streaming via consumer, how can I
commit the message without finish the whole processing (the whole processing
might take minutes), it looks I can't get the consumer from the KafkaUtils
to execute the kafka commit API.

3. If I can't do the manual commit, then I need to tell Kafka Consumer to
allow longer session or auto commit, for v0.8 or v0.9, I have tried to pass
following properties to KafkaUtils

kafkaParams.put("auto.commit.enable", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("zookeeper.session.timeout.ms", "60000");
kafkaParams.put("zookeeper.connection.timeout.ms", "60000");

Still not working.
Help is greatly appreciated !




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: spark streaming kafka connector questions

Posted by 毅程 <ph...@gmail.com>.
Cody, Thanks for the message.

1. as you mentioned, I do find the version for kafka 0.10.1, I will use
that, although lots of experimental tags. Thank you.
2. I have done thorough investigating, it is NOT the scenario where 1st
process failed, then 2nd process triggered.
3. I do agree the session timeout, auto commit are not the root cause here.
4. the problem i see is liked caused by a filter and union of the dstream
(I will try to elaborate in another question post)
if i just do kafka-stream -- process -- output operator, then there is no
problem
if i do
kafka-stream -- process(1) - filter a stream A for later union --|
                                       |_ filter a stream B  -- process(2)
-----|_ A union B output process (3)
the duplication message start process at the end of process(1), see
following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1* (fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing
(1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 (* (fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491460000 ms.0 from job set of time 1473491460000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
1473491460000 ms (execution: 10.874 s) (EVENT 1st time process cost 10.874
s)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) (EVENT 2nd time process cost 0.066)

and the 2nd time processing of the event finished without really doing the
work.

2016-09-08 14:55 GMT-07:00 Cody Koeninger <co...@koeninger.org>:

> - If you're seeing repeated attempts to process the same message, you
> should be able to look in the UI or logs and see that a task has
> failed.  Figure out why that task failed before chasing other things
>
> - You're not using the latest version, the latest version is for spark
> 2.0.  There are two versions of the connector for spark 2.0, one for
> kafka 0.8 or higher, and one for kafka 0.10 or higher
>
> - Committing individual messages to kafka doesn't make any sense,
> spark streaming deals with batches.  If you're doing any aggregations
> that involve shuffling, there isn't even a guarantee that you'll
> process messages in order for a given topicpartition
>
> - Auto commit has no effect for the 0.8 version of createDirectStream.
> Turning it on for the 0.10 version of createDirectStream is a really
> bad idea, it will give you undefined delivery semantics, because the
> commit to Kafka is unrelated to whether the batch processed
> successfully
>
> If you're unclear on how the kafka integration works, see
>
> https://github.com/koeninger/kafka-exactly-once
>
> On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi <ph...@gmail.com> wrote:
> > I am using the lastest streaming kafka connector
> > <groupId>org.apache.spark</groupId>
> > <artifactId>spark-streaming-kafka_2.11</artifactId>
> > <version>1.6.2</version>
> >
> > I am facing the problem that a message is delivered two times to my
> > consumers. these two deliveries are 10+ seconds apart, it looks this is
> > caused by my lengthy message processing (took about 60 seconds), then I
> > tried to solve this, but I am still stuck.
> >
> > 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
> > but not v.10
> >
> > JavaPairInputDStream<String, String> ds = KafkaUtils.createDirectStream(
> jsc,
> >                                         String.class, String.class,
> StringDecoder.class, StringDecoder.class,
> > kafkaParams, topicsSet);
> >
> > 2. after i got the message from the kafka streaming via consumer, how
> can I
> > commit the message without finish the whole processing (the whole
> processing
> > might take minutes), it looks I can't get the consumer from the
> KafkaUtils
> > to execute the kafka commit API.
> >
> > 3. If I can't do the manual commit, then I need to tell Kafka Consumer to
> > allow longer session or auto commit, for v0.8 or v0.9, I have tried to
> pass
> > following properties to KafkaUtils
> >
> > kafkaParams.put("auto.commit.enable", "true");
> > kafkaParams.put("auto.commit.interval.ms", "1000");
> > kafkaParams.put("zookeeper.session.timeout.ms", "60000");
> > kafkaParams.put("zookeeper.connection.timeout.ms", "60000");
> >
> > Still not working.
> > Help is greatly appreciated !
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-kafka-connector-
> questions-tp27681.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >
>

Re: spark streaming kafka connector questions

Posted by Cody Koeninger <co...@koeninger.org>.
- If you're seeing repeated attempts to process the same message, you
should be able to look in the UI or logs and see that a task has
failed.  Figure out why that task failed before chasing other things

- You're not using the latest version, the latest version is for spark
2.0.  There are two versions of the connector for spark 2.0, one for
kafka 0.8 or higher, and one for kafka 0.10 or higher

- Committing individual messages to kafka doesn't make any sense,
spark streaming deals with batches.  If you're doing any aggregations
that involve shuffling, there isn't even a guarantee that you'll
process messages in order for a given topicpartition

- Auto commit has no effect for the 0.8 version of createDirectStream.
Turning it on for the 0.10 version of createDirectStream is a really
bad idea, it will give you undefined delivery semantics, because the
commit to Kafka is unrelated to whether the batch processed
successfully

If you're unclear on how the kafka integration works, see

https://github.com/koeninger/kafka-exactly-once

On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi <ph...@gmail.com> wrote:
> I am using the lastest streaming kafka connector
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming-kafka_2.11</artifactId>
> <version>1.6.2</version>
>
> I am facing the problem that a message is delivered two times to my
> consumers. these two deliveries are 10+ seconds apart, it looks this is
> caused by my lengthy message processing (took about 60 seconds), then I
> tried to solve this, but I am still stuck.
>
> 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
> but not v.10
>
> JavaPairInputDStream<String, String> ds = KafkaUtils.createDirectStream(jsc,
>                                         String.class, String.class, StringDecoder.class, StringDecoder.class,
> kafkaParams, topicsSet);
>
> 2. after i got the message from the kafka streaming via consumer, how can I
> commit the message without finish the whole processing (the whole processing
> might take minutes), it looks I can't get the consumer from the KafkaUtils
> to execute the kafka commit API.
>
> 3. If I can't do the manual commit, then I need to tell Kafka Consumer to
> allow longer session or auto commit, for v0.8 or v0.9, I have tried to pass
> following properties to KafkaUtils
>
> kafkaParams.put("auto.commit.enable", "true");
> kafkaParams.put("auto.commit.interval.ms", "1000");
> kafkaParams.put("zookeeper.session.timeout.ms", "60000");
> kafkaParams.put("zookeeper.connection.timeout.ms", "60000");
>
> Still not working.
> Help is greatly appreciated !
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: spark streaming kafka connector questions

Posted by 毅程 <ph...@gmail.com>.
Thanks, That is what I am missing. I have added cache before action, and
that 2nd processing is avoided.

2016-09-10 5:10 GMT-07:00 Cody Koeninger <co...@koeninger.org>:

> Hard to say without seeing the code, but if you do multiple actions on an
> Rdd without caching, the Rdd will be computed multiple times.
>
> On Sep 10, 2016 2:43 AM, "Cheng Yi" <ph...@gmail.com> wrote:
>
> After some investigation, the problem i see is liked caused by a filter and
> union of the dstream.
> if i just do kafka-stream -- process -- output operator, then there is no
> problem, one event will be fetched once.
> if i do
> kafka-stream -- process(1) - filter a stream A for later union --|
>                                        |_ filter a stream B  -- process(2)
> -----|_ A union B output process (3)
> the event will be fetched 2 times, duplicate message start process at the
> end of process(1), see following traces:
>
> 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*
>
> 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
> 192.168.2.6:9092 (id: 2147483647 rack: null) for group
> spark-executor-testgid.
>
> log of processing (1) for event 1
>
> 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
> 1401 bytes result sent to driver
>
> 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
> 36) in 3494 ms on localhost (3/3)
>
> 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
> have all completed, from pool
>
> 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
> (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s
>
> 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages
>
> 16/09/10 00:11:03 INFO DAGScheduler: running: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
> ResultStage 11)
>
> 16/09/10 00:11:03 INFO DAGScheduler: failed: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
> (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
> has no missing parents
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
> ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)
>
> 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
> partition 2 offsets 1 -> 2
>
> 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
> time)*)
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 1473491460000 ms.0 from job set of time 1473491460000 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
> 1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
> s)*
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 1473491465000 ms.0 from job set of time 1473491465000 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
> 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*
>
> and the 2nd time processing of the event finished without really doing the
> work.
>
> Help is hugely appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-kafka-connector-questi
> ons-tp27681p27687.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>
>

Re: spark streaming kafka connector questions

Posted by Cody Koeninger <co...@koeninger.org>.
Hard to say without seeing the code, but if you do multiple actions on an
Rdd without caching, the Rdd will be computed multiple times.

On Sep 10, 2016 2:43 AM, "Cheng Yi" <ph...@gmail.com> wrote:

After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do
kafka-stream -- process(1) - filter a stream A for later union --|
                                       |_ filter a stream B  -- process(2)
-----|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491460000 ms.0 from job set of time 1473491460000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
s)*

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the
work.

Help is hugely appreciated.



--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/spark-streaming-kafka-connector-
questions-tp27681p27687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org

Re: spark streaming kafka connector questions

Posted by Cheng Yi <ph...@gmail.com>.
After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do 
kafka-stream -- process(1) - filter a stream A for later union --|
                                       |_ filter a stream B  -- process(2)
-----|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
192.168.2.6:9092 (id: 2147483647 rack: null) for group
spark-executor-testgid.

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool 

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
time)*)

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491460000 ms.0 from job set of time 1473491460000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
s)*

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the
work.

Help is hugely appreciated.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681p27687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org