You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rohit Rai <ro...@tuplejump.com> on 2014/07/11 17:18:25 UTC

Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.

Hi Gerard,

This was on my todos since long... i just published a Calliope snapshot
built against Hadoop 2.2.x, Take it for a spin if you get a chance -
You can get the jars from here -

   -
   https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope_2.10/0.9.4-H2-SNAPSHOT/calliope_2.10-0.9.4-H2-SNAPSHOT.jar
   -
   https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope-macros_2.10/0.9.4-H2-SNAPSHOT/calliope-macros_2.10-0.9.4-H2-SNAPSHOT.jar

Or to use from Maven -

<dependency>
  <groupId>com.tuplejump</groupId>
  <artifactId>calliope_2.10</artifactId>
  <version>0.9.4-H2-SNAPSHOT</version></dependency>

and SBT -

libraryDependencies += com.tuplejump %% calliope_2.10 % 0.9.4-H2-SNAPSHOT


It passes all the tests so I am assuming all is fine, but we haven't tested
it very extensively.

Regards,
Rohit


*Founder & CEO, **Tuplejump, Inc.*
____________________________
www.tuplejump.com
*The Data Engineering Platform*


On Fri, Jun 27, 2014 at 9:31 PM, Gerard Maas <ge...@gmail.com> wrote:

> Hi Rohit,
>
> Thanks for your message. We are currently on Spark 0.9.1, Cassandra 2.0.6
> and Calliope GA  (Would love to try the pre-release version if you want
> beta testers :-)   Our hadoop version is CDH4.4 and of course our spark
> assembly is compiled against it.
>
> We have got really interesting performance results from using Calliope and
> will probably try to compile it against Hadoop 2. Compared to the DataStax
> Java driver, out of the box, the Calliope lib gives us ~4.5x insert
> performance with a higher network and cpu usage (which is what we want in
> batch insert mode = fast)
>
> With additional code optimizations using the DataStax driver, we were able
> to reduce that gap to 2x but still Calliope was easier and faster to use.
>
> Will you be attending the Spark Summit? I'll be around.
>
> We'll be in touch in any case :-)
>
> -kr, Gerard.
>
>
>
> On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai <ro...@tuplejump.com> wrote:
>
>> Hi Gerard,
>>
>> What is the version of Spark, Hadoop, Cassandra and Calliope are you
>> using. We never built Calliope to Hadoop2 as we/or our clients don't use
>> Hadoop in their deployments or use it only as the Infra component for Spark
>> in which case H1/H2 doesn't make a difference for them.
>>
>> I know atleast of one case where the user had built Calliope against 2.0
>> and was using it happily. If you need assistance with it we are here to
>> help. Feel free to reach out to me directly and we can work out a solution
>> for you.
>>
>> Regards,
>> Rohit
>>
>>
>> *Founder & CEO, **Tuplejump, Inc.*
>> ____________________________
>> www.tuplejump.com
>> *The Data Engineering Platform*
>>
>>
>> On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Thanks Nick.
>>>
>>> We used the CassandraOutputFormat through Calliope. The Calliope API
>>> makes the CassandraOutputFormat quite accessible  and is cool to work with.
>>>  It worked fine at prototype level, but we had Hadoop version conflicts
>>> when we put it in our Spark environment (Using our Spark assembly compiled
>>> with CDH4.4). The conflict seems to be at the Cassandra-all lib level,
>>> which is compiled against a different hadoop version  (v1).
>>>
>>> We could not get round that issue. (Any pointers in that direction?)
>>>
>>> That's why I'm trying the direct CQLSSTableWriter way but it looks
>>> blocked as well.
>>>
>>>  -kr, Gerard.
>>>
>>>
>>>
>>>
>>> On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> can you not use a Cassandra OutputFormat? Seems they have
>>>> BulkOutputFormat. An example of using it with Hadoop is here:
>>>> http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html
>>>>
>>>> Using it with Spark will be similar to the examples:
>>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
>>>> and
>>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
>>>>
>>>>
>>>> On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas <ge...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> (My excuses for the cross-post from SO)
>>>>>
>>>>> I'm trying to create Cassandra SSTables from the results of a batch
>>>>> computation in Spark. Ideally, each partition should create the SSTable for
>>>>> the data it holds in order to parallelize the process as much as possible
>>>>> (and probably even stream it to the Cassandra ring as well)
>>>>>
>>>>> After the initial hurdles with the CQLSSTableWriter (like requiring
>>>>> the yaml file), I'm confronted now with this issue:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts
>>>>>     at org.apache.cassandra.config.Schema.load(Schema.java:347)
>>>>>     at org.apache.cassandra.config.Schema.load(Schema.java:112)
>>>>>     at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)
>>>>>
>>>>> I'm creating a writer on each parallel partition like this:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> def store(rdd:RDD[Message]) = {
>>>>>     rdd.foreachPartition( msgIterator => {
>>>>>       val writer = CQLSSTableWriter.builder()
>>>>>         .inDirectory("/tmp/cass")
>>>>>         .forTable(schema)
>>>>>         .using(insertSttmt).build()
>>>>>       msgIterator.foreach(msg => {...})
>>>>>     })}
>>>>>
>>>>> And if I'm reading the exception correctly, I can only create one
>>>>> writer per table in one JVM. Digging a bit further in the code, it looks
>>>>> like the Schema.load(...) singleton enforces that limitation.
>>>>>
>>>>> I guess writings to the writer will not be thread-safe and even if
>>>>> they were the contention that multiple threads will create by having all
>>>>> parallel tasks trying to dump few GB of data to disk at the same time will
>>>>> defeat the purpose of using the SSTables for bulk upload anyway.
>>>>>
>>>>> So, are there ways to use the CQLSSTableWriter concurrently?
>>>>>
>>>>> If not, what is the next best option to load batch data at high
>>>>> throughput in Cassandra?
>>>>>
>>>>> Will the upcoming Spark-Cassandra integration help with this? (ie.
>>>>> should I just sit back, relax and the problem will solve itself?)
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Gerard.
>>>>>
>>>>
>>>>
>>>
>>
>