You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Faraz Mateen <fm...@an10.io> on 2018/03/03 07:17:22 UTC

Cassandra/Spark failing to process large table

 Hi everyone,

I am trying to use spark to process a large cassandra table (~402 million
entries and 84 columns) but I am getting inconsistent results. Initially
the requirement was to copy some columns from this table to another table.
After copying the data, I noticed that some entries in the new table were
missing. To verify that I took count of the large source table but I am
getting different values each time. I tried the queries on a smaller table
(~7 million records) and the results were fine.

Initially, I attempted to take count using pyspark. Here is my pyspark
script:

spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable,
keyspace=sourcekeyspace).load().cache()
df.createOrReplaceTempView("data")
query = ("select count(1) from data " )
vgDF = spark.sql(query)
vgDF.show(10)

Spark submit command is as follows:

~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master
spark://10.128.0.18:7077 --packages
datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf
spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3"
--conf "spark.storage.memoryFraction=1" --conf
spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6
--executor-cores=2 --total-executor-cores 18 pyspark_script.py

The above spark submit process takes ~90 minutes to complete. I ran it
three times and here are the counts I got:

Spark iteration 1:  402273852
Spark iteration 2:  402273884
Spark iteration 3:  402274209

Spark does not show any error or exception during the entire process. I ran
the same query in cqlsh thrice and got different results again:

Cqlsh iteration 1:   402273598
Cqlsh iteration 2:   402273499
Cqlsh iteration 3:   402273515

I am unable to find out why I am getting different outcomes from the same
query. Cassandra system logs (*/var/log/cassandra/system.log*) has shown
the following error message just once:

ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592
CassandraDaemon.java:226 - Exception in thread
Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable
/media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460)
~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375)
~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536)
~[apache-cassandra-3.9.jar:3.9]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
~[na:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

*Versions:*

   - Cassandra 3.9
   - Spark 2.1.0
   - Datastax's spark-cassandra-connector 2.0.1
   - Scala version 2.11

*Cluster:*

   - Spark setup with 3 workers and 1 master node.
   - 3 worker nodes also have a cassandra cluster installed.
   - Each worker node has 8 CPU cores and 40 GB RAM.

Any help will be greatly appreciated.

Thanks,
Faraz

Re: Cassandra/Spark failing to process large table

Posted by kurt greaves <ku...@instaclustr.com>.
Note that read repairs only occur for QUORUM/equivalent and higher, and
also with a 10% (default) chance on anything less than QUORUM
(ONE/LOCAL_ONE). This is configured at the table level through the
dclocal_read_repair_chance and read_repair_chance settings (which are going
away in 4.0). So if you read at LOCAL_ONE it would have been chance that
caused the read repair. Don't expect it to happen for every read (unless
you configure it to, or use >=QUORUM).​

Re: Cassandra/Spark failing to process large table

Posted by Faraz Mateen <fm...@an10.io>.
Hi Ben,

That makes sense. I also read about "read repairs". So, once an
inconsistent record is read, cassandra synchronizes its replicas on other
nodes as well. I ran the same spark query again, this time with default
consistency level (LOCAL_ONE) and the result was correct.

Thanks again for the help.

Thanks,
Faraz

On Wed, Mar 7, 2018 at 7:13 AM, Ben Slater <be...@instaclustr.com>
wrote:

> Hi Faraz
>
> Yes, it likely does mean there is inconsistency in the replicas. However,
> you shouldn’t be too freaked out about it - Cassandra is design to allow
> for this inconsistency to occur and the consistency levels allow you to
> achieve consistent results despite replicas not being consistent. To keep
> you replicas as consistent as possible (which is still a good thing), you
> do need to regularly run repairs (once a week is the standard
> recommendation for full repairs). Inconsistency can result from a whole
> range of conditions from nodes being down the cluster being overloaded to
> network issues.
>
> Cheers
> Ben
>
> On Tue, 6 Mar 2018 at 22:18 Faraz Mateen <fm...@an10.io> wrote:
>
>> Thanks a lot for the response.
>>
>> Setting consistency to ALL/TWO started giving me consistent  count
>> results on both cqlsh and spark. As expected, my query time has increased
>> by 1.5x ( Before, it was taking ~1.6 hours but with consistency level ALL,
>> same query is taking ~2.4 hours to complete.)
>>
>> Does this mean my replicas are out of sync? When I first started pushing
>> data to cassandra, I had a single node setup. Then I added two more nodes,
>> changed replication factor to 2 and ran nodetool repair to distribute data
>> to all the nodes. So, according to my understanding the nodes should have
>> passively replicated data among themselves to remain in sync.
>>
>> Do I need to run repairs repeatedly to keep data in sync?
>> How can I further debug why my replicas were not in sync before?
>>
>> Thanks,
>> Faraz
>>
>> On Sun, Mar 4, 2018 at 9:46 AM, Ben Slater <be...@instaclustr.com>
>> wrote:
>>
>>> Both CQLSH and the Spark Cassandra query at consistent level ONE
>>> (LOCAL_ONE for Spark connector) by default so if there is any inconsistency
>>> in your replicas this can resulting in inconsistent query results.
>>>
>>> See http://cassandra.apache.org/doc/latest/tools/cqlsh.html and
>>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
>>> reference.md for info on how to chance consistency. If you are unsure
>>> of how consistent the on-disk replicas are (eg if you have been writing at
>>> CL One or haven’t run repaires) that using consistency level all should
>>> give you the most consistent results but requires all replicas to be
>>> available for the query to succeed. If you are using QUORUM for your writes
>>> then querying at QUORUM or LOCAL_QUORUM as appropriate should give you
>>> consistent results.
>>>
>>> Cheers
>>> Ben
>>>
>>> On Sun, 4 Mar 2018 at 00:59 Kant Kodali <ka...@peernova.com> wrote:
>>>
>>>> The fact that cqlsh itself gives different results tells me that this
>>>> has nothing to do with spark. Moreover, spark results are monotonically
>>>> increasing which seem to be more consistent than cqlsh. so I believe
>>>> spark can be taken out of the equation.
>>>>
>>>>  Now, while you are running these queries is there another process or
>>>> thread that is writing also at the same time ? If yes then your results are
>>>> fine but If it's not, you may want to try nodetool flush first and then run
>>>> these iterations again?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Fri, Mar 2, 2018 at 11:17 PM, Faraz Mateen <fm...@an10.io> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I am trying to use spark to process a large cassandra table (~402
>>>>> million entries and 84 columns) but I am getting inconsistent results.
>>>>> Initially the requirement was to copy some columns from this table to
>>>>> another table. After copying the data, I noticed that some entries in the
>>>>> new table were missing. To verify that I took count of the large source
>>>>> table but I am getting different values each time. I tried the queries on a
>>>>> smaller table (~7 million records) and the results were fine.
>>>>>
>>>>> Initially, I attempted to take count using pyspark. Here is my pyspark
>>>>> script:
>>>>>
>>>>> spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
>>>>> df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
>>>>> df.createOrReplaceTempView("data")
>>>>> query = ("select count(1) from data " )
>>>>> vgDF = spark.sql(query)
>>>>> vgDF.show(10)
>>>>>
>>>>> Spark submit command is as follows:
>>>>>
>>>>> ~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
>>>>>
>>>>> The above spark submit process takes ~90 minutes to complete. I ran it
>>>>> three times and here are the counts I got:
>>>>>
>>>>> Spark iteration 1:  402273852
>>>>> Spark iteration 2:  402273884
>>>>> Spark iteration 3:  402274209
>>>>>
>>>>> Spark does not show any error or exception during the entire process.
>>>>> I ran the same query in cqlsh thrice and got different results again:
>>>>>
>>>>> Cqlsh iteration 1:   402273598
>>>>> Cqlsh iteration 2:   402273499
>>>>> Cqlsh iteration 3:   402273515
>>>>>
>>>>> I am unable to find out why I am getting different outcomes from the
>>>>> same query. Cassandra system logs (*/var/log/cassandra/system.log*)
>>>>> has shown the following error message just once:
>>>>>
>>>>> ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
>>>>> java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
>>>>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
>>>>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
>>>>>         at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
>>>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
>>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
>>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
>>>>>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
>>>>>
>>>>> *Versions:*
>>>>>
>>>>>    - Cassandra 3.9
>>>>>    - Spark 2.1.0
>>>>>    - Datastax's spark-cassandra-connector 2.0.1
>>>>>    - Scala version 2.11
>>>>>
>>>>> *Cluster:*
>>>>>
>>>>>    - Spark setup with 3 workers and 1 master node.
>>>>>    - 3 worker nodes also have a cassandra cluster installed.
>>>>>    - Each worker node has 8 CPU cores and 40 GB RAM.
>>>>>
>>>>> Any help will be greatly appreciated.
>>>>>
>>>>> Thanks,
>>>>> Faraz
>>>>>
>>>>
>>>> --
>>>
>>>
>>> *Ben Slater*
>>>
>>> *Chief Product Officer <https://www.instaclustr.com/>*
>>>
>>> <https://www.facebook.com/instaclustr>
>>> <https://twitter.com/instaclustr>
>>> <https://www.linkedin.com/company/instaclustr>
>>>
>>> Read our latest technical blog posts here
>>> <https://www.instaclustr.com/blog/>.
>>>
>>> This email has been sent on behalf of Instaclustr Pty. Limited
>>> (Australia) and Instaclustr Inc (USA).
>>>
>>> This email and any attachments may contain confidential and legally
>>> privileged information.  If you are not the intended recipient, do not copy
>>> or disclose its content, but please reply to this email immediately and
>>> highlight the error to the sender and then immediately delete the message.
>>>
>>
>> --
>
>
> *Ben Slater*
>
> *Chief Product Officer <https://www.instaclustr.com/>*
>
> <https://www.facebook.com/instaclustr>   <https://twitter.com/instaclustr>
>    <https://www.linkedin.com/company/instaclustr>
>
> Read our latest technical blog posts here
> <https://www.instaclustr.com/blog/>.
>
> This email has been sent on behalf of Instaclustr Pty. Limited (Australia)
> and Instaclustr Inc (USA).
>
> This email and any attachments may contain confidential and legally
> privileged information.  If you are not the intended recipient, do not copy
> or disclose its content, but please reply to this email immediately and
> highlight the error to the sender and then immediately delete the message.
>

Re: Cassandra/Spark failing to process large table

Posted by Ben Slater <be...@instaclustr.com>.
Hi Faraz

Yes, it likely does mean there is inconsistency in the replicas. However,
you shouldn’t be too freaked out about it - Cassandra is design to allow
for this inconsistency to occur and the consistency levels allow you to
achieve consistent results despite replicas not being consistent. To keep
you replicas as consistent as possible (which is still a good thing), you
do need to regularly run repairs (once a week is the standard
recommendation for full repairs). Inconsistency can result from a whole
range of conditions from nodes being down the cluster being overloaded to
network issues.

Cheers
Ben

On Tue, 6 Mar 2018 at 22:18 Faraz Mateen <fm...@an10.io> wrote:

> Thanks a lot for the response.
>
> Setting consistency to ALL/TWO started giving me consistent  count results
> on both cqlsh and spark. As expected, my query time has increased by 1.5x (
> Before, it was taking ~1.6 hours but with consistency level ALL, same query
> is taking ~2.4 hours to complete.)
>
> Does this mean my replicas are out of sync? When I first started pushing
> data to cassandra, I had a single node setup. Then I added two more nodes,
> changed replication factor to 2 and ran nodetool repair to distribute data
> to all the nodes. So, according to my understanding the nodes should have
> passively replicated data among themselves to remain in sync.
>
> Do I need to run repairs repeatedly to keep data in sync?
> How can I further debug why my replicas were not in sync before?
>
> Thanks,
> Faraz
>
> On Sun, Mar 4, 2018 at 9:46 AM, Ben Slater <be...@instaclustr.com>
> wrote:
>
>> Both CQLSH and the Spark Cassandra query at consistent level ONE
>> (LOCAL_ONE for Spark connector) by default so if there is any inconsistency
>> in your replicas this can resulting in inconsistent query results.
>>
>> See http://cassandra.apache.org/doc/latest/tools/cqlsh.html and
>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md for
>> info on how to chance consistency. If you are unsure of how consistent the
>> on-disk replicas are (eg if you have been writing at CL One or haven’t run
>> repaires) that using consistency level all should give you the most
>> consistent results but requires all replicas to be available for the query
>> to succeed. If you are using QUORUM for your writes then querying at QUORUM
>> or LOCAL_QUORUM as appropriate should give you consistent results.
>>
>> Cheers
>> Ben
>>
>> On Sun, 4 Mar 2018 at 00:59 Kant Kodali <ka...@peernova.com> wrote:
>>
>>> The fact that cqlsh itself gives different results tells me that this
>>> has nothing to do with spark. Moreover, spark results are monotonically
>>> increasing which seem to be more consistent than cqlsh. so I believe
>>> spark can be taken out of the equation.
>>>
>>>  Now, while you are running these queries is there another process or
>>> thread that is writing also at the same time ? If yes then your results are
>>> fine but If it's not, you may want to try nodetool flush first and then run
>>> these iterations again?
>>>
>>> Thanks!
>>>
>>>
>>> On Fri, Mar 2, 2018 at 11:17 PM, Faraz Mateen <fm...@an10.io> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I am trying to use spark to process a large cassandra table (~402
>>>> million entries and 84 columns) but I am getting inconsistent results.
>>>> Initially the requirement was to copy some columns from this table to
>>>> another table. After copying the data, I noticed that some entries in the
>>>> new table were missing. To verify that I took count of the large source
>>>> table but I am getting different values each time. I tried the queries on a
>>>> smaller table (~7 million records) and the results were fine.
>>>>
>>>> Initially, I attempted to take count using pyspark. Here is my pyspark
>>>> script:
>>>>
>>>> spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
>>>> df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
>>>> df.createOrReplaceTempView("data")
>>>> query = ("select count(1) from data " )
>>>> vgDF = spark.sql(query)
>>>> vgDF.show(10)
>>>>
>>>> Spark submit command is as follows:
>>>>
>>>> ~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
>>>>
>>>> The above spark submit process takes ~90 minutes to complete. I ran it
>>>> three times and here are the counts I got:
>>>>
>>>> Spark iteration 1:  402273852
>>>> Spark iteration 2:  402273884
>>>> Spark iteration 3:  402274209
>>>>
>>>> Spark does not show any error or exception during the entire process. I
>>>> ran the same query in cqlsh thrice and got different results again:
>>>>
>>>> Cqlsh iteration 1:   402273598
>>>> Cqlsh iteration 2:   402273499
>>>> Cqlsh iteration 3:   402273515
>>>>
>>>> I am unable to find out why I am getting different outcomes from the
>>>> same query. Cassandra system logs (*/var/log/cassandra/system.log*)
>>>> has shown the following error message just once:
>>>>
>>>> ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
>>>> java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
>>>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
>>>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
>>>>         at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
>>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
>>>>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
>>>>
>>>> *Versions:*
>>>>
>>>>    - Cassandra 3.9
>>>>    - Spark 2.1.0
>>>>    - Datastax's spark-cassandra-connector 2.0.1
>>>>    - Scala version 2.11
>>>>
>>>> *Cluster:*
>>>>
>>>>    - Spark setup with 3 workers and 1 master node.
>>>>    - 3 worker nodes also have a cassandra cluster installed.
>>>>    - Each worker node has 8 CPU cores and 40 GB RAM.
>>>>
>>>> Any help will be greatly appreciated.
>>>>
>>>> Thanks,
>>>> Faraz
>>>>
>>>
>>> --
>>
>>
>> *Ben Slater*
>>
>> *Chief Product Officer <https://www.instaclustr.com/>*
>>
>> <https://www.facebook.com/instaclustr>
>> <https://twitter.com/instaclustr>
>> <https://www.linkedin.com/company/instaclustr>
>>
>> Read our latest technical blog posts here
>> <https://www.instaclustr.com/blog/>.
>>
>> This email has been sent on behalf of Instaclustr Pty. Limited
>> (Australia) and Instaclustr Inc (USA).
>>
>> This email and any attachments may contain confidential and legally
>> privileged information.  If you are not the intended recipient, do not copy
>> or disclose its content, but please reply to this email immediately and
>> highlight the error to the sender and then immediately delete the message.
>>
>
> --


*Ben Slater*

*Chief Product Officer <https://www.instaclustr.com/>*

<https://www.facebook.com/instaclustr>   <https://twitter.com/instaclustr>
<https://www.linkedin.com/company/instaclustr>

Read our latest technical blog posts here
<https://www.instaclustr.com/blog/>.

This email has been sent on behalf of Instaclustr Pty. Limited (Australia)
and Instaclustr Inc (USA).

This email and any attachments may contain confidential and legally
privileged information.  If you are not the intended recipient, do not copy
or disclose its content, but please reply to this email immediately and
highlight the error to the sender and then immediately delete the message.

Re: Cassandra/Spark failing to process large table

Posted by Faraz Mateen <fm...@an10.io>.
Thanks a lot for the response.

Setting consistency to ALL/TWO started giving me consistent  count results
on both cqlsh and spark. As expected, my query time has increased by 1.5x (
Before, it was taking ~1.6 hours but with consistency level ALL, same query
is taking ~2.4 hours to complete.)

Does this mean my replicas are out of sync? When I first started pushing
data to cassandra, I had a single node setup. Then I added two more nodes,
changed replication factor to 2 and ran nodetool repair to distribute data
to all the nodes. So, according to my understanding the nodes should have
passively replicated data among themselves to remain in sync.

Do I need to run repairs repeatedly to keep data in sync?
How can I further debug why my replicas were not in sync before?

Thanks,
Faraz

On Sun, Mar 4, 2018 at 9:46 AM, Ben Slater <be...@instaclustr.com>
wrote:

> Both CQLSH and the Spark Cassandra query at consistent level ONE
> (LOCAL_ONE for Spark connector) by default so if there is any inconsistency
> in your replicas this can resulting in inconsistent query results.
>
> See http://cassandra.apache.org/doc/latest/tools/cqlsh.html and
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/
> reference.md for info on how to chance consistency. If you are unsure of
> how consistent the on-disk replicas are (eg if you have been writing at CL
> One or haven’t run repaires) that using consistency level all should give
> you the most consistent results but requires all replicas to be available
> for the query to succeed. If you are using QUORUM for your writes then
> querying at QUORUM or LOCAL_QUORUM as appropriate should give you
> consistent results.
>
> Cheers
> Ben
>
> On Sun, 4 Mar 2018 at 00:59 Kant Kodali <ka...@peernova.com> wrote:
>
>> The fact that cqlsh itself gives different results tells me that this has
>> nothing to do with spark. Moreover, spark results are monotonically
>> increasing which seem to be more consistent than cqlsh. so I believe
>> spark can be taken out of the equation.
>>
>>  Now, while you are running these queries is there another process or
>> thread that is writing also at the same time ? If yes then your results are
>> fine but If it's not, you may want to try nodetool flush first and then run
>> these iterations again?
>>
>> Thanks!
>>
>>
>> On Fri, Mar 2, 2018 at 11:17 PM, Faraz Mateen <fm...@an10.io> wrote:
>>
>>> Hi everyone,
>>>
>>> I am trying to use spark to process a large cassandra table (~402
>>> million entries and 84 columns) but I am getting inconsistent results.
>>> Initially the requirement was to copy some columns from this table to
>>> another table. After copying the data, I noticed that some entries in the
>>> new table were missing. To verify that I took count of the large source
>>> table but I am getting different values each time. I tried the queries on a
>>> smaller table (~7 million records) and the results were fine.
>>>
>>> Initially, I attempted to take count using pyspark. Here is my pyspark
>>> script:
>>>
>>> spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
>>> df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
>>> df.createOrReplaceTempView("data")
>>> query = ("select count(1) from data " )
>>> vgDF = spark.sql(query)
>>> vgDF.show(10)
>>>
>>> Spark submit command is as follows:
>>>
>>> ~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
>>>
>>> The above spark submit process takes ~90 minutes to complete. I ran it
>>> three times and here are the counts I got:
>>>
>>> Spark iteration 1:  402273852
>>> Spark iteration 2:  402273884
>>> Spark iteration 3:  402274209
>>>
>>> Spark does not show any error or exception during the entire process. I
>>> ran the same query in cqlsh thrice and got different results again:
>>>
>>> Cqlsh iteration 1:   402273598
>>> Cqlsh iteration 2:   402273499
>>> Cqlsh iteration 3:   402273515
>>>
>>> I am unable to find out why I am getting different outcomes from the
>>> same query. Cassandra system logs (*/var/log/cassandra/system.log*) has
>>> shown the following error message just once:
>>>
>>> ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
>>> java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
>>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
>>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
>>>         at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
>>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
>>>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
>>>
>>> *Versions:*
>>>
>>>    - Cassandra 3.9
>>>    - Spark 2.1.0
>>>    - Datastax's spark-cassandra-connector 2.0.1
>>>    - Scala version 2.11
>>>
>>> *Cluster:*
>>>
>>>    - Spark setup with 3 workers and 1 master node.
>>>    - 3 worker nodes also have a cassandra cluster installed.
>>>    - Each worker node has 8 CPU cores and 40 GB RAM.
>>>
>>> Any help will be greatly appreciated.
>>>
>>> Thanks,
>>> Faraz
>>>
>>
>> --
>
>
> *Ben Slater*
>
> *Chief Product Officer <https://www.instaclustr.com/>*
>
> <https://www.facebook.com/instaclustr>   <https://twitter.com/instaclustr>
>    <https://www.linkedin.com/company/instaclustr>
>
> Read our latest technical blog posts here
> <https://www.instaclustr.com/blog/>.
>
> This email has been sent on behalf of Instaclustr Pty. Limited (Australia)
> and Instaclustr Inc (USA).
>
> This email and any attachments may contain confidential and legally
> privileged information.  If you are not the intended recipient, do not copy
> or disclose its content, but please reply to this email immediately and
> highlight the error to the sender and then immediately delete the message.
>

Re: Cassandra/Spark failing to process large table

Posted by Ben Slater <be...@instaclustr.com>.
Both CQLSH and the Spark Cassandra query at consistent level ONE (LOCAL_ONE
for Spark connector) by default so if there is any inconsistency in your
replicas this can resulting in inconsistent query results.

See http://cassandra.apache.org/doc/latest/tools/cqlsh.html and
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md
for
info on how to chance consistency. If you are unsure of how consistent the
on-disk replicas are (eg if you have been writing at CL One or haven’t run
repaires) that using consistency level all should give you the most
consistent results but requires all replicas to be available for the query
to succeed. If you are using QUORUM for your writes then querying at QUORUM
or LOCAL_QUORUM as appropriate should give you consistent results.

Cheers
Ben

On Sun, 4 Mar 2018 at 00:59 Kant Kodali <ka...@peernova.com> wrote:

> The fact that cqlsh itself gives different results tells me that this has
> nothing to do with spark. Moreover, spark results are monotonically
> increasing which seem to be more consistent than cqlsh. so I believe
> spark can be taken out of the equation.
>
>  Now, while you are running these queries is there another process or
> thread that is writing also at the same time ? If yes then your results are
> fine but If it's not, you may want to try nodetool flush first and then run
> these iterations again?
>
> Thanks!
>
>
> On Fri, Mar 2, 2018 at 11:17 PM, Faraz Mateen <fm...@an10.io> wrote:
>
>> Hi everyone,
>>
>> I am trying to use spark to process a large cassandra table (~402 million
>> entries and 84 columns) but I am getting inconsistent results. Initially
>> the requirement was to copy some columns from this table to another table.
>> After copying the data, I noticed that some entries in the new table were
>> missing. To verify that I took count of the large source table but I am
>> getting different values each time. I tried the queries on a smaller table
>> (~7 million records) and the results were fine.
>>
>> Initially, I attempted to take count using pyspark. Here is my pyspark
>> script:
>>
>> spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
>> df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
>> df.createOrReplaceTempView("data")
>> query = ("select count(1) from data " )
>> vgDF = spark.sql(query)
>> vgDF.show(10)
>>
>> Spark submit command is as follows:
>>
>> ~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
>>
>> The above spark submit process takes ~90 minutes to complete. I ran it
>> three times and here are the counts I got:
>>
>> Spark iteration 1:  402273852
>> Spark iteration 2:  402273884
>> Spark iteration 3:  402274209
>>
>> Spark does not show any error or exception during the entire process. I
>> ran the same query in cqlsh thrice and got different results again:
>>
>> Cqlsh iteration 1:   402273598
>> Cqlsh iteration 2:   402273499
>> Cqlsh iteration 3:   402273515
>>
>> I am unable to find out why I am getting different outcomes from the same
>> query. Cassandra system logs (*/var/log/cassandra/system.log*) has shown
>> the following error message just once:
>>
>> ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
>> java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
>>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
>>         at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
>>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
>>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
>>
>> *Versions:*
>>
>>    - Cassandra 3.9
>>    - Spark 2.1.0
>>    - Datastax's spark-cassandra-connector 2.0.1
>>    - Scala version 2.11
>>
>> *Cluster:*
>>
>>    - Spark setup with 3 workers and 1 master node.
>>    - 3 worker nodes also have a cassandra cluster installed.
>>    - Each worker node has 8 CPU cores and 40 GB RAM.
>>
>> Any help will be greatly appreciated.
>>
>> Thanks,
>> Faraz
>>
>
> --


*Ben Slater*

*Chief Product Officer <https://www.instaclustr.com/>*

<https://www.facebook.com/instaclustr>   <https://twitter.com/instaclustr>
<https://www.linkedin.com/company/instaclustr>

Read our latest technical blog posts here
<https://www.instaclustr.com/blog/>.

This email has been sent on behalf of Instaclustr Pty. Limited (Australia)
and Instaclustr Inc (USA).

This email and any attachments may contain confidential and legally
privileged information.  If you are not the intended recipient, do not copy
or disclose its content, but please reply to this email immediately and
highlight the error to the sender and then immediately delete the message.

Re: Cassandra/Spark failing to process large table

Posted by Kant Kodali <ka...@peernova.com>.
The fact that cqlsh itself gives different results tells me that this has
nothing to do with spark. Moreover, spark results are monotonically
increasing which seem to be more consistent than cqlsh. so I believe
spark can be taken out of the equation.

 Now, while you are running these queries is there another process or
thread that is writing also at the same time ? If yes then your results are
fine but If it's not, you may want to try nodetool flush first and then run
these iterations again?

Thanks!


On Fri, Mar 2, 2018 at 11:17 PM, Faraz Mateen <fm...@an10.io> wrote:

> Hi everyone,
>
> I am trying to use spark to process a large cassandra table (~402 million
> entries and 84 columns) but I am getting inconsistent results. Initially
> the requirement was to copy some columns from this table to another table.
> After copying the data, I noticed that some entries in the new table were
> missing. To verify that I took count of the large source table but I am
> getting different values each time. I tried the queries on a smaller table
> (~7 million records) and the results were fine.
>
> Initially, I attempted to take count using pyspark. Here is my pyspark
> script:
>
> spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
> df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
> df.createOrReplaceTempView("data")
> query = ("select count(1) from data " )
> vgDF = spark.sql(query)
> vgDF.show(10)
>
> Spark submit command is as follows:
>
> ~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
>
> The above spark submit process takes ~90 minutes to complete. I ran it
> three times and here are the counts I got:
>
> Spark iteration 1:  402273852
> Spark iteration 2:  402273884
> Spark iteration 3:  402274209
>
> Spark does not show any error or exception during the entire process. I
> ran the same query in cqlsh thrice and got different results again:
>
> Cqlsh iteration 1:   402273598
> Cqlsh iteration 2:   402273499
> Cqlsh iteration 3:   402273515
>
> I am unable to find out why I am getting different outcomes from the same
> query. Cassandra system logs (*/var/log/cassandra/system.log*) has shown
> the following error message just once:
>
> ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
> java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
>         at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
>         at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
>
> *Versions:*
>
>    - Cassandra 3.9
>    - Spark 2.1.0
>    - Datastax's spark-cassandra-connector 2.0.1
>    - Scala version 2.11
>
> *Cluster:*
>
>    - Spark setup with 3 workers and 1 master node.
>    - 3 worker nodes also have a cassandra cluster installed.
>    - Each worker node has 8 CPU cores and 40 GB RAM.
>
> Any help will be greatly appreciated.
>
> Thanks,
> Faraz
>