You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jerry Wong <je...@gmail.com> on 2016/02/16 22:33:41 UTC

Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

Hello Everybody,

I have questions using Spark streaming to consume data from Kafka and
insert to Cassandra database but not sure whether should post in the Kafka
users mailing list or not. I appreciated it if you do have any suggestions
to me.

5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
Hadoop, Cassandra
Scala: 2.10.5
Spark: 1.2.2
Hadoop: 1.2.1
Cassandra 2.0.18

3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
Kafka: 0.8.2.1
Zookeeper: 3.4.6

Other configurations:
batchInterval = 6 Seconds
blockInterval = 1500 millis
spark.locality.wait = 500 millis
#Consumers = 10

There are two columns in the cassandra table
keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".

Here is a piece of codes,

@transient val kstreams = (1 to numConsumers.toInt).map { _ =>
KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
 StorageLevel.MEMORY_AND_DISK_SER)
        .map(_._2.toString).map(Tuple1(_))
        .map{case(log) => (System.currentTimeMillis(), log)}
}
@transient val unifiedMessage = ssc.union(kstreams)

unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
SomeColumns("createdtime", "log"))

I created a producer and send messages to Brokers (1000 messages/per time)

But the Cassandra can only be inserted about 100 messages in each round of
test.
Can anybody give me advices why the other messages (about 900 message)
can't be consumed?
How do I configure and tune the parameters in order to improve the
throughput of consumers?

Thank you very much for your reading and suggestions in advances.

Jerry Wong

Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

Posted by Jerry Wong <je...@gmail.com>.
Hi LiYuan,

Thank you very much for your response.
This problem was solved. Actually, a lots of messages are created almost in
the same time (even use milliseconds). I changed the key with using
"UUID.randomUUID()" with which all messages can be inserted in the
Cassandra table without time lag.

Regards,
Jerry Wong


On Wed, Feb 17, 2016 at 9:43 PM, yuanjia8947@163.com <yu...@163.com>
wrote:

> Hi Jerry,
>     1. Make sure that 1000 messages have been sent to kafka, before
> consuming.
>     2. If you don't care the sequence between messages, you can use
> mutiple partition and use more comsumers.
>
>
>
> LiYuanJia
>
> From: Jerry Wong
> Date: 2016-02-17 05:33
> To: users
> Subject: Optimize the performance of inserting data to Cassandra with
> Kafka and Spark Streaming
> Hello Everybody,
>
> I have questions using Spark streaming to consume data from Kafka and
> insert to Cassandra database but not sure whether should post in the Kafka
> users mailing list or not. I appreciated it if you do have any suggestions
> to me.
>
> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
> Hadoop, Cassandra
> Scala: 2.10.5
> Spark: 1.2.2
> Hadoop: 1.2.1
> Cassandra 2.0.18
>
> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
> Kafka: 0.8.2.1
> Zookeeper: 3.4.6
>
> Other configurations:
> batchInterval = 6 Seconds
> blockInterval = 1500 millis
> spark.locality.wait = 500 millis
> #Consumers = 10
>
> There are two columns in the cassandra table
> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>
> Here is a piece of codes,
>
> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
> StorageLevel.MEMORY_AND_DISK_SER)
>         .map(_._2.toString).map(Tuple1(_))
>         .map{case(log) => (System.currentTimeMillis(), log)}
> }
> @transient val unifiedMessage = ssc.union(kstreams)
>
> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
> SomeColumns("createdtime", "log"))
>
> I created a producer and send messages to Brokers (1000 messages/per time)
>
> But the Cassandra can only be inserted about 100 messages in each round of
> test.
> Can anybody give me advices why the other messages (about 900 message)
> can't be consumed?
> How do I configure and tune the parameters in order to improve the
> throughput of consumers?
>
> Thank you very much for your reading and suggestions in advances.
>
> Jerry Wong
>

Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

Posted by "yuanjia8947@163.com" <yu...@163.com>.
Hi Jerry,
    1. Make sure that 1000 messages have been sent to kafka, before consuming.
    2. If you don't care the sequence between messages, you can use mutiple partition and use more comsumers.



LiYuanJia
 
From: Jerry Wong
Date: 2016-02-17 05:33
To: users
Subject: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming
Hello Everybody,
 
I have questions using Spark streaming to consume data from Kafka and
insert to Cassandra database but not sure whether should post in the Kafka
users mailing list or not. I appreciated it if you do have any suggestions
to me.
 
5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
Hadoop, Cassandra
Scala: 2.10.5
Spark: 1.2.2
Hadoop: 1.2.1
Cassandra 2.0.18
 
3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
Kafka: 0.8.2.1
Zookeeper: 3.4.6
 
Other configurations:
batchInterval = 6 Seconds
blockInterval = 1500 millis
spark.locality.wait = 500 millis
#Consumers = 10
 
There are two columns in the cassandra table
keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
 
Here is a piece of codes,
 
@transient val kstreams = (1 to numConsumers.toInt).map { _ =>
KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
StorageLevel.MEMORY_AND_DISK_SER)
        .map(_._2.toString).map(Tuple1(_))
        .map{case(log) => (System.currentTimeMillis(), log)}
}
@transient val unifiedMessage = ssc.union(kstreams)
 
unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
SomeColumns("createdtime", "log"))
 
I created a producer and send messages to Brokers (1000 messages/per time)
 
But the Cassandra can only be inserted about 100 messages in each round of
test.
Can anybody give me advices why the other messages (about 900 message)
can't be consumed?
How do I configure and tune the parameters in order to improve the
throughput of consumers?
 
Thank you very much for your reading and suggestions in advances.
 
Jerry Wong