You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Rajesh Radhakrishnan <Ra...@phe.gov.uk> on 2016/11/07 16:51:05 UTC

Cassandra Python Driver : execute_async consumes lots of memory?

Hi

We are trying to inject millions to data into a table by executing Batches of PreparedStatments.

We found that when we use 'session.execute(batch)', it write more data but very very slow.
However if we use  'session.execute_async(batch)' then its relatively fast but when it reaches certain limit, its fillup the memory (python process)

Our implementation:
Cassandra 3.7.0 cluster  ring with 3 nodes (RedHat, 150GB Disk, 8GB of RAM each)

Python 2.7.12

Anybody know how to reduce the memory use of Cassandra-python driver API specifically for execute_async? Thank you!



===CODE ======================================
      sqlQuery = "INSERT INTO tableV  (id, sample_name, pos, ref_base, var_base) values (?,?,?,?,?)"
       random_numbers_for_strains = random.sample(xrange(1,300), 200)
        random_numbers = random.sample(xrange(1,2000000), 200000)

        totalCounter  = 0
        c = 0
        time_init = time.time()
        for random_number_strain in random_numbers_for_strains:

            sample_name = None
            sample_name = 'sample'+str(random_number_strain)

            cassandraCluster = CassandraCluster.CassandraCluster()
            cluster = cassandraCluster.create_cluster_with_protocol2()
            session = cluster.connect();
            #session.default_timeout = 1800
            session.set_keyspace(self.KEYSPACE_NAME)

            preparedStatement = session.prepare(sqlQuery)

            counter = 0
            c = c + 1

            for random_number in random_numbers:

                totalCounter += 1
                if counter == 0 :
                    batch = BatchStatement()

                counter += 1
                if totalCounter % 10000 == 0 :
                    print "Total Count "+ str(totalCounter)

                batch.add(preparedStatement.bind([ uuid.uuid1(), sample_name, random_number, random.choice('GT'), random.choice('AC')]))
                if counter % 50 == 0:
                    session.execute_async(batch)
                    #session.execute(batch)
                    batch = None
                    del batch
                    counter = 0

            time.sleep(2);
            session.cluster.shutdown()
            random_number= None
            del random_number
            preparedStatement = None
            session = None
            del session
            cluster = None
            del cluster
            cassandraCluster = None
            del cassandraCluster
            gc.collect()

===CODE ======================================



Kind regards,
Rajesh Radhakrishnan


**************************************************************************
The information contained in the EMail and any attachments is confidential and intended solely and for the attention and use of the named addressee(s). It may not be disclosed to any other person without the express authority of Public Health England, or the intended recipient, or both. If you are not the intended recipient, you must not disclose, copy, distribute or retain this message or any part of it. This footnote also confirms that this EMail has been swept for computer viruses by Symantec.Cloud, but please re-sweep any attachments before opening or saving. http://www.gov.uk/PHE
**************************************************************************

RE: Cassandra Python Driver : execute_async consumes lots of memory?

Posted by Rajesh Radhakrishnan <Ra...@phe.gov.uk>.
Hi Lahiru,

Thank you for the reply. I will try reducing the batch size to 20 and see how much memory usage I can reduce.

I might try Spark streaming too. Cheers!


Kind regards,
Rajesh R

________________________________
From: Lahiru Gamathige [lahiru@highfive.com]
Sent: 07 November 2016 17:10
To: user@cassandra.apache.org
Subject: Re: Cassandra Python Driver : execute_async consumes lots of memory?

Hi Rajesh,

By looking at your code I see that the memory would definitely grow because you write big batches async and you will end up large number of batch statements and the all end up slowing down. We recently migrated some data to C* and what we did was we created a data stream and wrote in batches and used a library which is sensitive to back-pressure of the stream. In your implementation there's is no back-pressure to control it. We migrated data pretty fast by keeping the CPU 100% constantly and achieve the highest performance (used Scala with akka-streams and phantom-websudo).

I would consider using some streaming API to implement this. When you do batching make sure you don't exceed the max match size, then things will slow down anyways.

Lahiru

On Mon, Nov 7, 2016 at 8:51 AM, Rajesh Radhakrishnan <Rajesh.Radhakrishnan@phe.gov.uk<redir.aspx?REF=GvqGFcevT1ya9l3HAqSNmO4M5tTZq0dwWz9uDyLF3vAkgyBYuAfUCAFtYWlsdG86UmFqZXNoLlJhZGhha3Jpc2huYW5AcGhlLmdvdi51aw..>> wrote:
Hi

We are trying to inject millions to data into a table by executing Batches of PreparedStatments.

We found that when we use 'session.execute(batch)', it write more data but very very slow.
However if we use  'session.execute_async(batch)' then its relatively fast but when it reaches certain limit, its fillup the memory (python process)

Our implementation:
Cassandra 3.7.0 cluster  ring with 3 nodes (RedHat, 150GB Disk, 8GB of RAM each)

Python 2.7.12

Anybody know how to reduce the memory use of Cassandra-python driver API specifically for execute_async? Thank you!



===CODE ======================================
      sqlQuery = "INSERT INTO tableV  (id, sample_name, pos, ref_base, var_base) values (?,?,?,?,?)"
       random_numbers_for_strains = random.sample(xrange(1,300), 200)
        random_numbers = random.sample(xrange(1,2000000), 200000)

        totalCounter  = 0
        c = 0
        time_init = time.time()
        for random_number_strain in random_numbers_for_strains:

            sample_name = None
            sample_name = 'sample'+str(random_number_strain)

            cassandraCluster = CassandraCluster.CassandraCluster()
            cluster = cassandraCluster.create_cluster_with_protocol2()
            session = cluster.connect();
            #session.default_timeout = 1800
            session.set_keyspace(self.KEYSPACE_NAME)

            preparedStatement = session.prepare(sqlQuery)

            counter = 0
            c = c + 1

            for random_number in random_numbers:

                totalCounter += 1
                if counter == 0 :
                    batch = BatchStatement()

                counter += 1
                if totalCounter % 10000 == 0 :
                    print "Total Count "+ str(totalCounter)

                batch.add(preparedStatement.bind([ uuid.uuid1(), sample_name, random_number, random.choice('GT'), random.choice('AC')]))
                if counter % 50 == 0:
                    session.execute_async(batch)
                    #session.execute(batch)
                    batch = None
                    del batch
                    counter = 0

            time.sleep(2);
            session.cluster.shutdown()
            random_number= None
            del random_number
            preparedStatement = None
            session = None
            del session
            cluster = None
            del cluster
            cassandraCluster = None
            del cassandraCluster
            gc.collect()

===CODE ======================================



Kind regards,
Rajesh Radhakrishnan


**************************************************************************
The information contained in the EMail and any attachments is confidential and intended solely and for the attention and use of the named addressee(s). It may not be disclosed to any other person without the express authority of Public Health England, or the intended recipient, or both. If you are not the intended recipient, you must not disclose, copy, distribute or retain this message or any part of it. This footnote also confirms that this EMail has been swept for computer viruses by Symantec.Cloud, but please re-sweep any attachments before opening or saving. http://www.gov.uk/PHE<redir.aspx?REF=FDWRIPxSro-UcQ3JFIvkKgQHI91sBLPZPbwJZdKtkNskgyBYuAfUCAFodHRwOi8vd3d3Lmdvdi51ay9QSEU.>
**************************************************************************


**************************************************************************
The information contained in the EMail and any attachments is confidential and intended solely and for the attention and use of the named addressee(s). It may not be disclosed to any other person without the express authority of Public Health England, or the intended recipient, or both. If you are not the intended recipient, you must not disclose, copy, distribute or retain this message or any part of it. This footnote also confirms that this EMail has been swept for computer viruses by Symantec.Cloud, but please re-sweep any attachments before opening or saving. http://www.gov.uk/PHE
**************************************************************************

RE: Cassandra Python Driver : execute_async consumes lots of memory?

Posted by Rajesh Radhakrishnan <Ra...@phe.gov.uk>.
Hi Lahiru,

Great! you know what, REDUCTION of BATCH size from 50 to 20 solved my issue.

Thank you very much. Good job man! and Memory issue solved.

Next I will try using Spark to speed it up.


Kind regards,
Rajesh Radhakrishnan

________________________________
From: Lahiru Gamathige [lahiru@highfive.com]
Sent: 07 November 2016 17:10
To: user@cassandra.apache.org
Subject: Re: Cassandra Python Driver : execute_async consumes lots of memory?

Hi Rajesh,

By looking at your code I see that the memory would definitely grow because you write big batches async and you will end up large number of batch statements and the all end up slowing down. We recently migrated some data to C* and what we did was we created a data stream and wrote in batches and used a library which is sensitive to back-pressure of the stream. In your implementation there's is no back-pressure to control it. We migrated data pretty fast by keeping the CPU 100% constantly and achieve the highest performance (used Scala with akka-streams and phantom-websudo).

I would consider using some streaming API to implement this. When you do batching make sure you don't exceed the max match size, then things will slow down anyways.

Lahiru

On Mon, Nov 7, 2016 at 8:51 AM, Rajesh Radhakrishnan <Rajesh.Radhakrishnan@phe.gov.uk<redir.aspx?REF=p-JriOAxoM9Fo2NB0JvWvECqgKxDOGz999xt8bgSngUZsB7TvwfUCAFtYWlsdG86UmFqZXNoLlJhZGhha3Jpc2huYW5AcGhlLmdvdi51aw..>> wrote:
Hi

We are trying to inject millions to data into a table by executing Batches of PreparedStatments.

We found that when we use 'session.execute(batch)', it write more data but very very slow.
However if we use  'session.execute_async(batch)' then its relatively fast but when it reaches certain limit, its fillup the memory (python process)

Our implementation:
Cassandra 3.7.0 cluster  ring with 3 nodes (RedHat, 150GB Disk, 8GB of RAM each)

Python 2.7.12

Anybody know how to reduce the memory use of Cassandra-python driver API specifically for execute_async? Thank you!



===CODE ======================================
      sqlQuery = "INSERT INTO tableV  (id, sample_name, pos, ref_base, var_base) values (?,?,?,?,?)"
       random_numbers_for_strains = random.sample(xrange(1,300), 200)
        random_numbers = random.sample(xrange(1,2000000), 200000)

        totalCounter  = 0
        c = 0
        time_init = time.time()
        for random_number_strain in random_numbers_for_strains:

            sample_name = None
            sample_name = 'sample'+str(random_number_strain)

            cassandraCluster = CassandraCluster.CassandraCluster()
            cluster = cassandraCluster.create_cluster_with_protocol2()
            session = cluster.connect();
            #session.default_timeout = 1800
            session.set_keyspace(self.KEYSPACE_NAME)

            preparedStatement = session.prepare(sqlQuery)

            counter = 0
            c = c + 1

            for random_number in random_numbers:

                totalCounter += 1
                if counter == 0 :
                    batch = BatchStatement()

                counter += 1
                if totalCounter % 10000 == 0 :
                    print "Total Count "+ str(totalCounter)

                batch.add(preparedStatement.bind([ uuid.uuid1(), sample_name, random_number, random.choice('GT'), random.choice('AC')]))
                if counter % 50 == 0:
                    session.execute_async(batch)
                    #session.execute(batch)
                    batch = None
                    del batch
                    counter = 0

            time.sleep(2);
            session.cluster.shutdown()
            random_number= None
            del random_number
            preparedStatement = None
            session = None
            del session
            cluster = None
            del cluster
            cassandraCluster = None
            del cassandraCluster
            gc.collect()

===CODE ======================================



Kind regards,
Rajesh Radhakrishnan


**************************************************************************
The information contained in the EMail and any attachments is confidential and intended solely and for the attention and use of the named addressee(s). It may not be disclosed to any other person without the express authority of Public Health England, or the intended recipient, or both. If you are not the intended recipient, you must not disclose, copy, distribute or retain this message or any part of it. This footnote also confirms that this EMail has been swept for computer viruses by Symantec.Cloud, but please re-sweep any attachments before opening or saving. http://www.gov.uk/PHE<redir.aspx?REF=A-VbX04jP6vOTA0I37LwhE8gyo2hWHvaqqScXhOC4NsZsB7TvwfUCAFodHRwOi8vd3d3Lmdvdi51ay9QSEU.>
**************************************************************************


**************************************************************************
The information contained in the EMail and any attachments is confidential and intended solely and for the attention and use of the named addressee(s). It may not be disclosed to any other person without the express authority of Public Health England, or the intended recipient, or both. If you are not the intended recipient, you must not disclose, copy, distribute or retain this message or any part of it. This footnote also confirms that this EMail has been swept for computer viruses by Symantec.Cloud, but please re-sweep any attachments before opening or saving. http://www.gov.uk/PHE
**************************************************************************

Re: Cassandra Python Driver : execute_async consumes lots of memory?

Posted by Lahiru Gamathige <la...@highfive.com>.
Hi Rajesh,

By looking at your code I see that the memory would definitely grow because
you write big batches async and you will end up large number of batch
statements and the all end up slowing down. We recently migrated some data
to C* and what we did was we created a data stream and wrote in batches and
used a library which is sensitive to back-pressure of the stream. In your
implementation there's is no back-pressure to control it. We migrated data
pretty fast by keeping the CPU 100% constantly and achieve the highest
performance (used Scala with akka-streams and phantom-websudo).

I would consider using some streaming API to implement this. When you do
batching make sure you don't exceed the max match size, then things will
slow down anyways.

Lahiru

On Mon, Nov 7, 2016 at 8:51 AM, Rajesh Radhakrishnan <
Rajesh.Radhakrishnan@phe.gov.uk> wrote:

> Hi
>
> We are trying to inject millions to data into a table by executing Batches
> of PreparedStatments.
>
> We found that when we use 'session.execute(batch)', it write more data but
> very very slow.
> However if we use  'session.execute_async(batch)' then its relatively fast
> but when it reaches certain limit, its fillup the memory (python process)
>
> Our implementation:
> Cassandra 3.7.0 cluster  ring with 3 nodes (RedHat, 150GB Disk, 8GB of RAM
> each)
>
> Python 2.7.12
>
> Anybody know how to reduce the memory use of Cassandra-python driver API
> specifically for execute_async? Thank you!
>
>
>
> ===CODE ======================================
>       sqlQuery = "INSERT INTO tableV  (id, sample_name, pos, ref_base,
> var_base) values (?,?,?,?,?)"
>        random_numbers_for_strains = random.sample(xrange(1,300), 200)
>         random_numbers = random.sample(xrange(1,2000000), 200000)
>
>         totalCounter  = 0
>         c = 0
>         time_init = time.time()
>         for random_number_strain in random_numbers_for_strains:
>
>             sample_name = None
>             sample_name = 'sample'+str(random_number_strain)
>
>             cassandraCluster = CassandraCluster.CassandraCluster()
>             cluster = cassandraCluster.create_cluster_with_protocol2()
>             session = cluster.connect();
>             #session.default_timeout = 1800
>             session.set_keyspace(self.KEYSPACE_NAME)
>
>             preparedStatement = session.prepare(sqlQuery)
>
>             counter = 0
>             c = c + 1
>
>             for random_number in random_numbers:
>
>                 totalCounter += 1
>                 if counter == 0 :
>                     batch = BatchStatement()
>
>                 counter += 1
>                 if totalCounter % 10000 == 0 :
>                     print "Total Count "+ str(totalCounter)
>
>                 batch.add(preparedStatement.bind([ uuid.uuid1(),
> sample_name, random_number, random.choice('GT'), random.choice('AC')]))
>                 if counter % 50 == 0:
>                     session.execute_async(batch)
>                     #session.execute(batch)
>                     batch = None
>                     del batch
>                     counter = 0
>
>             time.sleep(2);
>             session.cluster.shutdown()
>             random_number= None
>             del random_number
>             preparedStatement = None
>             session = None
>             del session
>             cluster = None
>             del cluster
>             cassandraCluster = None
>             del cassandraCluster
>             gc.collect()
>
> ===CODE ======================================
>
>
>
> Kind regards,
> Rajesh Radhakrishnan
>
>
> **************************************************************************
> The information contained in the EMail and any attachments is confidential
> and intended solely and for the attention and use of the named
> addressee(s). It may not be disclosed to any other person without the
> express authority of Public Health England, or the intended recipient, or
> both. If you are not the intended recipient, you must not disclose, copy,
> distribute or retain this message or any part of it. This footnote also
> confirms that this EMail has been swept for computer viruses by
> Symantec.Cloud, but please re-sweep any attachments before opening or
> saving. http://www.gov.uk/PHE
> **************************************************************************
>