You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jerry <je...@gmail.com> on 2016/02/16 22:29:33 UTC

Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

Hello, 

I have questions using Spark streaming to consume data from Kafka and insert
to Cassandra database.

5 AWS instances (each one does have 8 cores, 30GB memory) for Spark, Hadoop,
Cassandra
Scala: 2.10.5
Spark: 1.2.2
Hadoop: 1.2.1
Cassandra 2.0.18

3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB memory)
Kafka: 0.8.2.1
Zookeeper: 3.4.6

Other configurations:
batchInterval = 6 Seconds
blockInterval = 1500 millis
spark.locality.wait = 500 millis
#Consumers = 10

There are two columns in the cassandra table keySpaceOfTopicA.tableOfTopicA,
"createdtime" and "log".

Here is a piece of codes,

@transient val kstreams = (1 to numConsumers.toInt).map { _ =>
KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),   
StorageLevel.MEMORY_AND_DISK_SER)
        .map(_._2.toString).map(Tuple1(_))
        .map{case(log) => (System.currentTimeMillis(), log)}
}
@transient val unifiedMessage = ssc.union(kstreams)

unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
SomeColumns("createdtime", "log"))

I created a producer and send messages to Brokers (1000 messages/per time)

But the Cassandra can only be inserted about 100 messages in each round of
test.
Can anybody give me advices why the other messages (about 900 message) can't
be consumed? 
How do I configure and tune the parameters in order to improve the
throughput of consumers?  

Thank you very much for your reading and suggestions in advances.

Jerry Wong



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

Posted by Jerry <je...@gmail.com>.
Rado,

Yes. you are correct. A lots of messages are created almost in the same
time (even use milliseconds). I changed to use "UUID.randomUUID()" with
which all messages can be inserted in the Cassandra table without time lag.

Thank you very much!
Jerry Wong

On Wed, Feb 17, 2016 at 1:50 AM, radoburansky [via Apache Spark User List] <
ml-node+s1001560n26246h73@n3.nabble.com> wrote:

> Hi Jerry,
>
> How do you know that only 100 messages are inserted? What is the primary
> key of the "tableOfTopicA" Cassandra table? Isn't it possible that you
> map more messages to the same primamary key and therefore they overwrite
> each other in Cassandra?
>
> Regards
>
> Rado
>
> On Tue, Feb 16, 2016 at 10:29 PM, Jerry [via Apache Spark User List] <[hidden
> email] <http:///user/SendEmail.jtp?type=node&node=26246&i=0>> wrote:
>
>> Hello,
>>
>> I have questions using Spark streaming to consume data from Kafka and
>> insert to Cassandra database.
>>
>> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
>> Hadoop, Cassandra
>> Scala: 2.10.5
>> Spark: 1.2.2
>> Hadoop: 1.2.1
>> Cassandra 2.0.18
>>
>> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB
>> memory)
>> Kafka: 0.8.2.1
>> Zookeeper: 3.4.6
>>
>> Other configurations:
>> batchInterval = 6 Seconds
>> blockInterval = 1500 millis
>> spark.locality.wait = 500 millis
>> #Consumers = 10
>>
>> There are two columns in the cassandra table
>> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>>
>> Here is a piece of codes,
>>
>> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
>> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
>>  StorageLevel.MEMORY_AND_DISK_SER)
>>         .map(_._2.toString).map(Tuple1(_))
>>         .map{case(log) => (System.currentTimeMillis(), log)}
>> }
>> @transient val unifiedMessage = ssc.union(kstreams)
>>
>> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
>> SomeColumns("createdtime", "log"))
>>
>> I created a producer and send messages to Brokers (1000 messages/per
>> time)
>>
>> But the Cassandra can only be inserted about 100 messages in each round
>> of test.
>> Can anybody give me advices why the other messages (about 900 message)
>> can't be consumed?
>> How do I configure and tune the parameters in order to improve the
>> throughput of consumers?
>>
>> Thank you very much for your reading and suggestions in advances.
>>
>> Jerry Wong
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=26246&i=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26246.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1h78@n3.nabble.com
> To unsubscribe from Optimize the performance of inserting data to
> Cassandra with Kafka and Spark Streaming, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=26244&code=amVycnkua2luZzIud29uZ0BnbWFpbC5jb218MjYyNDR8MTYwMzcyMjg3MQ==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26252.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Optimize the performance of inserting data to Cassandra with Kafka and Spark Streaming

Posted by radoburansky <ra...@gmail.com>.
Hi Jerry,

How do you know that only 100 messages are inserted? What is the primary
key of the "tableOfTopicA" Cassandra table? Isn't it possible that you map
more messages to the same primamary key and therefore they overwrite each
other in Cassandra?

Regards

Rado

On Tue, Feb 16, 2016 at 10:29 PM, Jerry [via Apache Spark User List] <
ml-node+s1001560n26244h87@n3.nabble.com> wrote:

> Hello,
>
> I have questions using Spark streaming to consume data from Kafka and
> insert to Cassandra database.
>
> 5 AWS instances (each one does have 8 cores, 30GB memory) for Spark,
> Hadoop, Cassandra
> Scala: 2.10.5
> Spark: 1.2.2
> Hadoop: 1.2.1
> Cassandra 2.0.18
>
> 3 AWS instances for Kafka cluster (each one does have 8 cores, 30GB
> memory)
> Kafka: 0.8.2.1
> Zookeeper: 3.4.6
>
> Other configurations:
> batchInterval = 6 Seconds
> blockInterval = 1500 millis
> spark.locality.wait = 500 millis
> #Consumers = 10
>
> There are two columns in the cassandra table
> keySpaceOfTopicA.tableOfTopicA, "createdtime" and "log".
>
> Here is a piece of codes,
>
> @transient val kstreams = (1 to numConsumers.toInt).map { _ =>
> KafkaUtils.createStream(ssc, zkeeper, groupId, Map("topicA"->1),
>  StorageLevel.MEMORY_AND_DISK_SER)
>         .map(_._2.toString).map(Tuple1(_))
>         .map{case(log) => (System.currentTimeMillis(), log)}
> }
> @transient val unifiedMessage = ssc.union(kstreams)
>
> unifiedMessage.saveToCassandra("keySpaceOfTopicA", "tableOfTopicA",
> SomeColumns("createdtime", "log"))
>
> I created a producer and send messages to Brokers (1000 messages/per time)
>
> But the Cassandra can only be inserted about 100 messages in each round of
> test.
> Can anybody give me advices why the other messages (about 900 message)
> can't be consumed?
> How do I configure and tune the parameters in order to improve the
> throughput of consumers?
>
> Thank you very much for your reading and suggestions in advances.
>
> Jerry Wong
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1h54@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=cmFkb2J1cmFuc2t5QGdtYWlsLmNvbXwxfC03MDA2NjE5MjQ=>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Optimize-the-performance-of-inserting-data-to-Cassandra-with-Kafka-and-Spark-Streaming-tp26244p26246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.