You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Soumitra Kumar <ku...@gmail.com> on 2014/02/25 08:15:09 UTC

HBase row count

I have a code which reads an HBase table, and counts number of rows
containing a field.

    def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
RDD[List[Array[Byte]]] = {
        return rdd.flatMap(kv => {
            // Set of interesting keys for this use case
            val keys = List ("src")
            var data = List[Array[Byte]]()
            var usefulRow = false

            val cf = Bytes.toBytes ("cf")
            keys.foreach {key =>
                val col = kv._2.getValue(cf, Bytes.toBytes(key))
                if (col != null)
                    usefulRow = true
                data = data :+ col
            }

            if (usefulRow)
                Some(data)
            else
                None
        })
    }

    def main(args: Array[String]) {
        val hBaseRDD = init(args)
        // hBaseRDD.cache()

        println("**** Initial row count " + hBaseRDD.count())
        println("**** Rows with interesting fields " +
readFields(hBaseRDD).count())
  }


I am running on a one mode CDH installation.

As it is it takes around 2.5 minutes. But if I comment out 'println("****
Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.

Is it doing HBase scan twice, for both 'count' calls? How do I improve it?

Thanks,
-Soumitra.

Re: HBase row count

Posted by Koert Kuipers <ko...@tresata.com>.
i find them both somewhat confusing actually.
* RDD.cache is lazy, and mutates the RDD in place
* RDD.unpersist has a direct effect of unloading, and also mutates the RDD
in place to disable future lazy caching

i have found that if i need to unload an RDD from memory, but still want it
to be cached again in the future, i need to do:
rdd.unpersist.cache






On Tue, Feb 25, 2014 at 6:50 AM, Cheng Lian <rh...@gmail.com> wrote:

> BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not
> lazy, which is somewhat confusing...
>
>
> On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian <rh...@gmail.com> wrote:
>
>> RDD.cache() is a lazy operation, the method itself doesn't perform the
>> cache operation, it just asks Spark runtime to cache the content of the RDD
>> when the first action is invoked.  In your case, the first action is the
>> first count() call, which conceptually does 3 things:
>>
>>    1. Performs the HBase scan
>>    2. Counts all the element
>>    3. Caches the RDD content
>>
>>
>>
>> On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar <kumar.soumitra@gmail.com
>> > wrote:
>>
>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>
>>> My expectation is that with cache enabled, there should not be any
>>> penalty of 'hBaseRDD.count' call.
>>>
>>>
>>>
>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>> improve this would be to use cache(), which is what you have in your
>>>> commented out line:
>>>> // hBaseRDD.cache()
>>>>
>>>> If you uncomment that line, you should see an improvement overall.
>>>>
>>>> If caching is not an option for some reason (maybe data is too large),
>>>> then you can implement an overall count in your readFields method using
>>>> accumulators:
>>>>
>>>> val count = sc.accumulator(0L)
>>>> ...
>>>> In your flatMap function do count += 1 for each row (regardless of
>>>> whether "interesting" or not).
>>>>
>>>> In your main method after doing an action (e.g. count in your case),
>>>> call val totalCount = count.value.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>> kumar.soumitra@gmail.com> wrote:
>>>>
>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>> containing a field.
>>>>>
>>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>> RDD[List[Array[Byte]]] = {
>>>>>         return rdd.flatMap(kv => {
>>>>>             // Set of interesting keys for this use case
>>>>>             val keys = List ("src")
>>>>>             var data = List[Array[Byte]]()
>>>>>             var usefulRow = false
>>>>>
>>>>>             val cf = Bytes.toBytes ("cf")
>>>>>             keys.foreach {key =>
>>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>                 if (col != null)
>>>>>                     usefulRow = true
>>>>>                 data = data :+ col
>>>>>             }
>>>>>
>>>>>             if (usefulRow)
>>>>>                 Some(data)
>>>>>             else
>>>>>                 None
>>>>>         })
>>>>>     }
>>>>>
>>>>>     def main(args: Array[String]) {
>>>>>         val hBaseRDD = init(args)
>>>>>         // hBaseRDD.cache()
>>>>>
>>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>>         println("**** Rows with interesting fields " +
>>>>> readFields(hBaseRDD).count())
>>>>>   }
>>>>>
>>>>>
>>>>> I am running on a one mode CDH installation.
>>>>>
>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>>> 1.5 minutes.
>>>>>
>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>>> it?
>>>>>
>>>>> Thanks,
>>>>> -Soumitra.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: HBase row count

Posted by Cheng Lian <rh...@gmail.com>.
BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not lazy,
which is somewhat confusing...


On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian <rh...@gmail.com> wrote:

> RDD.cache() is a lazy operation, the method itself doesn't perform the
> cache operation, it just asks Spark runtime to cache the content of the RDD
> when the first action is invoked.  In your case, the first action is the
> first count() call, which conceptually does 3 things:
>
>    1. Performs the HBase scan
>    2. Counts all the element
>    3. Caches the RDD content
>
>
>
> On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar <ku...@gmail.com>wrote:
>
>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>
>> My expectation is that with cache enabled, there should not be any
>> penalty of 'hBaseRDD.count' call.
>>
>>
>>
>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>> nick.pentreath@gmail.com> wrote:
>>
>>> Yes, you''re initiating a scan for each count call. The normal way to
>>> improve this would be to use cache(), which is what you have in your
>>> commented out line:
>>> // hBaseRDD.cache()
>>>
>>> If you uncomment that line, you should see an improvement overall.
>>>
>>> If caching is not an option for some reason (maybe data is too large),
>>> then you can implement an overall count in your readFields method using
>>> accumulators:
>>>
>>> val count = sc.accumulator(0L)
>>> ...
>>> In your flatMap function do count += 1 for each row (regardless of
>>> whether "interesting" or not).
>>>
>>> In your main method after doing an action (e.g. count in your case),
>>> call val totalCount = count.value.
>>>
>>>
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>> kumar.soumitra@gmail.com> wrote:
>>>
>>>> I have a code which reads an HBase table, and counts number of rows
>>>> containing a field.
>>>>
>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>> RDD[List[Array[Byte]]] = {
>>>>         return rdd.flatMap(kv => {
>>>>             // Set of interesting keys for this use case
>>>>             val keys = List ("src")
>>>>             var data = List[Array[Byte]]()
>>>>             var usefulRow = false
>>>>
>>>>             val cf = Bytes.toBytes ("cf")
>>>>             keys.foreach {key =>
>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>                 if (col != null)
>>>>                     usefulRow = true
>>>>                 data = data :+ col
>>>>             }
>>>>
>>>>             if (usefulRow)
>>>>                 Some(data)
>>>>             else
>>>>                 None
>>>>         })
>>>>     }
>>>>
>>>>     def main(args: Array[String]) {
>>>>         val hBaseRDD = init(args)
>>>>         // hBaseRDD.cache()
>>>>
>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>         println("**** Rows with interesting fields " +
>>>> readFields(hBaseRDD).count())
>>>>   }
>>>>
>>>>
>>>> I am running on a one mode CDH installation.
>>>>
>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>> 1.5 minutes.
>>>>
>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>> it?
>>>>
>>>> Thanks,
>>>> -Soumitra.
>>>>
>>>
>>>
>>
>

Re: HBase row count

Posted by Cheng Lian <rh...@gmail.com>.
RDD.cache() is a lazy operation, the method itself doesn't perform the
cache operation, it just asks Spark runtime to cache the content of the RDD
when the first action is invoked.  In your case, the first action is the
first count() call, which conceptually does 3 things:

   1. Performs the HBase scan
   2. Counts all the element
   3. Caches the RDD content



On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar <ku...@gmail.com>wrote:

> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>
> My expectation is that with cache enabled, there should not be any penalty
> of 'hBaseRDD.count' call.
>
>
>
> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <nick.pentreath@gmail.com
> > wrote:
>
>> Yes, you''re initiating a scan for each count call. The normal way to
>> improve this would be to use cache(), which is what you have in your
>> commented out line:
>> // hBaseRDD.cache()
>>
>> If you uncomment that line, you should see an improvement overall.
>>
>> If caching is not an option for some reason (maybe data is too large),
>> then you can implement an overall count in your readFields method using
>> accumulators:
>>
>> val count = sc.accumulator(0L)
>> ...
>> In your flatMap function do count += 1 for each row (regardless of
>> whether "interesting" or not).
>>
>> In your main method after doing an action (e.g. count in your case), call val
>> totalCount = count.value.
>>
>>
>>
>>
>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <kumar.soumitra@gmail.com
>> > wrote:
>>
>>> I have a code which reads an HBase table, and counts number of rows
>>> containing a field.
>>>
>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>> RDD[List[Array[Byte]]] = {
>>>         return rdd.flatMap(kv => {
>>>             // Set of interesting keys for this use case
>>>             val keys = List ("src")
>>>             var data = List[Array[Byte]]()
>>>             var usefulRow = false
>>>
>>>             val cf = Bytes.toBytes ("cf")
>>>             keys.foreach {key =>
>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>                 if (col != null)
>>>                     usefulRow = true
>>>                 data = data :+ col
>>>             }
>>>
>>>             if (usefulRow)
>>>                 Some(data)
>>>             else
>>>                 None
>>>         })
>>>     }
>>>
>>>     def main(args: Array[String]) {
>>>         val hBaseRDD = init(args)
>>>         // hBaseRDD.cache()
>>>
>>>         println("**** Initial row count " + hBaseRDD.count())
>>>         println("**** Rows with interesting fields " +
>>> readFields(hBaseRDD).count())
>>>   }
>>>
>>>
>>> I am running on a one mode CDH installation.
>>>
>>> As it is it takes around 2.5 minutes. But if I comment out
>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>> 1.5 minutes.
>>>
>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>> it?
>>>
>>> Thanks,
>>> -Soumitra.
>>>
>>
>>
>

Re: HBase row count

Posted by Nick Pentreath <ni...@gmail.com>.
Currently no there is no way to save the web ui details. There was some discussion around adding this on the mailing list but no change as yet —
Sent from Mailbox for iPhone

On Tue, Feb 25, 2014 at 7:23 PM, Soumitra Kumar <ku...@gmail.com>
wrote:

> Found the issue, actually splits in HBase was not uniform, so one job was
> taking 90% of time.
> BTW, is there a way to save the details available port 4040 after job is
> finished?
> On Tue, Feb 25, 2014 at 7:26 AM, Nick Pentreath <ni...@gmail.com>wrote:
>> It's tricky really since you may not know upfront how much data is in
>> there. You could possibly take a look at how much data is in the HBase
>> tables to get an idea.
>>
>> It may take a bit of trial and error, like running out of memory trying to
>> cache the dataset, and checking the Spark UI on port 4040 to see how much
>> is cached and how much memory still remains available, etc etc. You should
>> also take a look at http://spark.apache.org/docs/latest/tuning.html for
>> ideas around memory and serialization tuning.
>>
>> Broadly speaking, what you want to try to do is filter as much data as
>> possible first and cache the subset of data on which you'll be performing
>> multiple passes or computations. For example, based on your code above, you
>> may in fact only wish to cache the data that is the "interesting" fields
>> RDD. It all depends on what you're trying to achieve.
>>
>> If you will only be doing one pass through the data anyway (like running a
>> count every time on the full dataset) then caching is not going to help you.
>>
>>
>> On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar <ku...@gmail.com>wrote:
>>
>>> Thanks Nick.
>>>
>>> How do I figure out if the RDD fits in memory?
>>>
>>>
>>> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath <nick.pentreath@gmail.com
>>> > wrote:
>>>
>>>> cache only caches the data on the first action (count) - the first time
>>>> it still needs to read the data from the source. So the first time you call
>>>> count it will take the same amount of time whether cache is enabled or not.
>>>> The second time you call count on a cached RDD, you should see that it
>>>> takes a lot less time (assuming that the data fit in memory).
>>>>
>>>>
>>>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <
>>>> kumar.soumitra@gmail.com> wrote:
>>>>
>>>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>>>
>>>>> My expectation is that with cache enabled, there should not be any
>>>>> penalty of 'hBaseRDD.count' call.
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>>>> nick.pentreath@gmail.com> wrote:
>>>>>
>>>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>>>> improve this would be to use cache(), which is what you have in your
>>>>>> commented out line:
>>>>>> // hBaseRDD.cache()
>>>>>>
>>>>>> If you uncomment that line, you should see an improvement overall.
>>>>>>
>>>>>> If caching is not an option for some reason (maybe data is too large),
>>>>>> then you can implement an overall count in your readFields method using
>>>>>> accumulators:
>>>>>>
>>>>>> val count = sc.accumulator(0L)
>>>>>> ...
>>>>>> In your flatMap function do count += 1 for each row (regardless of
>>>>>> whether "interesting" or not).
>>>>>>
>>>>>> In your main method after doing an action (e.g. count in your case),
>>>>>> call val totalCount = count.value.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>>>> kumar.soumitra@gmail.com> wrote:
>>>>>>
>>>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>>>> containing a field.
>>>>>>>
>>>>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>>>> RDD[List[Array[Byte]]] = {
>>>>>>>         return rdd.flatMap(kv => {
>>>>>>>             // Set of interesting keys for this use case
>>>>>>>             val keys = List ("src")
>>>>>>>             var data = List[Array[Byte]]()
>>>>>>>             var usefulRow = false
>>>>>>>
>>>>>>>             val cf = Bytes.toBytes ("cf")
>>>>>>>             keys.foreach {key =>
>>>>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>>>                 if (col != null)
>>>>>>>                     usefulRow = true
>>>>>>>                 data = data :+ col
>>>>>>>             }
>>>>>>>
>>>>>>>             if (usefulRow)
>>>>>>>                 Some(data)
>>>>>>>             else
>>>>>>>                 None
>>>>>>>         })
>>>>>>>     }
>>>>>>>
>>>>>>>     def main(args: Array[String]) {
>>>>>>>         val hBaseRDD = init(args)
>>>>>>>         // hBaseRDD.cache()
>>>>>>>
>>>>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>>>>         println("**** Rows with interesting fields " +
>>>>>>> readFields(hBaseRDD).count())
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>> I am running on a one mode CDH installation.
>>>>>>>
>>>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>>>>> 1.5 minutes.
>>>>>>>
>>>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I
>>>>>>> improve it?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> -Soumitra.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: HBase row count

Posted by Soumitra Kumar <ku...@gmail.com>.
Found the issue, actually splits in HBase was not uniform, so one job was
taking 90% of time.

BTW, is there a way to save the details available port 4040 after job is
finished?


On Tue, Feb 25, 2014 at 7:26 AM, Nick Pentreath <ni...@gmail.com>wrote:

> It's tricky really since you may not know upfront how much data is in
> there. You could possibly take a look at how much data is in the HBase
> tables to get an idea.
>
> It may take a bit of trial and error, like running out of memory trying to
> cache the dataset, and checking the Spark UI on port 4040 to see how much
> is cached and how much memory still remains available, etc etc. You should
> also take a look at http://spark.apache.org/docs/latest/tuning.html for
> ideas around memory and serialization tuning.
>
> Broadly speaking, what you want to try to do is filter as much data as
> possible first and cache the subset of data on which you'll be performing
> multiple passes or computations. For example, based on your code above, you
> may in fact only wish to cache the data that is the "interesting" fields
> RDD. It all depends on what you're trying to achieve.
>
> If you will only be doing one pass through the data anyway (like running a
> count every time on the full dataset) then caching is not going to help you.
>
>
> On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar <ku...@gmail.com>wrote:
>
>> Thanks Nick.
>>
>> How do I figure out if the RDD fits in memory?
>>
>>
>> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath <nick.pentreath@gmail.com
>> > wrote:
>>
>>> cache only caches the data on the first action (count) - the first time
>>> it still needs to read the data from the source. So the first time you call
>>> count it will take the same amount of time whether cache is enabled or not.
>>> The second time you call count on a cached RDD, you should see that it
>>> takes a lot less time (assuming that the data fit in memory).
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <
>>> kumar.soumitra@gmail.com> wrote:
>>>
>>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>>
>>>> My expectation is that with cache enabled, there should not be any
>>>> penalty of 'hBaseRDD.count' call.
>>>>
>>>>
>>>>
>>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>>> nick.pentreath@gmail.com> wrote:
>>>>
>>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>>> improve this would be to use cache(), which is what you have in your
>>>>> commented out line:
>>>>> // hBaseRDD.cache()
>>>>>
>>>>> If you uncomment that line, you should see an improvement overall.
>>>>>
>>>>> If caching is not an option for some reason (maybe data is too large),
>>>>> then you can implement an overall count in your readFields method using
>>>>> accumulators:
>>>>>
>>>>> val count = sc.accumulator(0L)
>>>>> ...
>>>>> In your flatMap function do count += 1 for each row (regardless of
>>>>> whether "interesting" or not).
>>>>>
>>>>> In your main method after doing an action (e.g. count in your case),
>>>>> call val totalCount = count.value.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>>> kumar.soumitra@gmail.com> wrote:
>>>>>
>>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>>> containing a field.
>>>>>>
>>>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>>> RDD[List[Array[Byte]]] = {
>>>>>>         return rdd.flatMap(kv => {
>>>>>>             // Set of interesting keys for this use case
>>>>>>             val keys = List ("src")
>>>>>>             var data = List[Array[Byte]]()
>>>>>>             var usefulRow = false
>>>>>>
>>>>>>             val cf = Bytes.toBytes ("cf")
>>>>>>             keys.foreach {key =>
>>>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>>                 if (col != null)
>>>>>>                     usefulRow = true
>>>>>>                 data = data :+ col
>>>>>>             }
>>>>>>
>>>>>>             if (usefulRow)
>>>>>>                 Some(data)
>>>>>>             else
>>>>>>                 None
>>>>>>         })
>>>>>>     }
>>>>>>
>>>>>>     def main(args: Array[String]) {
>>>>>>         val hBaseRDD = init(args)
>>>>>>         // hBaseRDD.cache()
>>>>>>
>>>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>>>         println("**** Rows with interesting fields " +
>>>>>> readFields(hBaseRDD).count())
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> I am running on a one mode CDH installation.
>>>>>>
>>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>>>> 1.5 minutes.
>>>>>>
>>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I
>>>>>> improve it?
>>>>>>
>>>>>> Thanks,
>>>>>> -Soumitra.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: HBase row count

Posted by Nick Pentreath <ni...@gmail.com>.
It's tricky really since you may not know upfront how much data is in
there. You could possibly take a look at how much data is in the HBase
tables to get an idea.

It may take a bit of trial and error, like running out of memory trying to
cache the dataset, and checking the Spark UI on port 4040 to see how much
is cached and how much memory still remains available, etc etc. You should
also take a look at http://spark.apache.org/docs/latest/tuning.html for
ideas around memory and serialization tuning.

Broadly speaking, what you want to try to do is filter as much data as
possible first and cache the subset of data on which you'll be performing
multiple passes or computations. For example, based on your code above, you
may in fact only wish to cache the data that is the "interesting" fields
RDD. It all depends on what you're trying to achieve.

If you will only be doing one pass through the data anyway (like running a
count every time on the full dataset) then caching is not going to help you.


On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar <ku...@gmail.com>wrote:

> Thanks Nick.
>
> How do I figure out if the RDD fits in memory?
>
>
> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath <ni...@gmail.com>wrote:
>
>> cache only caches the data on the first action (count) - the first time
>> it still needs to read the data from the source. So the first time you call
>> count it will take the same amount of time whether cache is enabled or not.
>> The second time you call count on a cached RDD, you should see that it
>> takes a lot less time (assuming that the data fit in memory).
>>
>>
>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <kumar.soumitra@gmail.com
>> > wrote:
>>
>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>
>>> My expectation is that with cache enabled, there should not be any
>>> penalty of 'hBaseRDD.count' call.
>>>
>>>
>>>
>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>> improve this would be to use cache(), which is what you have in your
>>>> commented out line:
>>>> // hBaseRDD.cache()
>>>>
>>>> If you uncomment that line, you should see an improvement overall.
>>>>
>>>> If caching is not an option for some reason (maybe data is too large),
>>>> then you can implement an overall count in your readFields method using
>>>> accumulators:
>>>>
>>>> val count = sc.accumulator(0L)
>>>> ...
>>>> In your flatMap function do count += 1 for each row (regardless of
>>>> whether "interesting" or not).
>>>>
>>>> In your main method after doing an action (e.g. count in your case),
>>>> call val totalCount = count.value.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>> kumar.soumitra@gmail.com> wrote:
>>>>
>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>> containing a field.
>>>>>
>>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>> RDD[List[Array[Byte]]] = {
>>>>>         return rdd.flatMap(kv => {
>>>>>             // Set of interesting keys for this use case
>>>>>             val keys = List ("src")
>>>>>             var data = List[Array[Byte]]()
>>>>>             var usefulRow = false
>>>>>
>>>>>             val cf = Bytes.toBytes ("cf")
>>>>>             keys.foreach {key =>
>>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>                 if (col != null)
>>>>>                     usefulRow = true
>>>>>                 data = data :+ col
>>>>>             }
>>>>>
>>>>>             if (usefulRow)
>>>>>                 Some(data)
>>>>>             else
>>>>>                 None
>>>>>         })
>>>>>     }
>>>>>
>>>>>     def main(args: Array[String]) {
>>>>>         val hBaseRDD = init(args)
>>>>>         // hBaseRDD.cache()
>>>>>
>>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>>         println("**** Rows with interesting fields " +
>>>>> readFields(hBaseRDD).count())
>>>>>   }
>>>>>
>>>>>
>>>>> I am running on a one mode CDH installation.
>>>>>
>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>>> 1.5 minutes.
>>>>>
>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>>> it?
>>>>>
>>>>> Thanks,
>>>>> -Soumitra.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: HBase row count

Posted by Soumitra Kumar <ku...@gmail.com>.
Thanks Nick.

How do I figure out if the RDD fits in memory?


On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath <ni...@gmail.com>wrote:

> cache only caches the data on the first action (count) - the first time it
> still needs to read the data from the source. So the first time you call
> count it will take the same amount of time whether cache is enabled or not.
> The second time you call count on a cached RDD, you should see that it
> takes a lot less time (assuming that the data fit in memory).
>
>
> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <ku...@gmail.com>wrote:
>
>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>
>> My expectation is that with cache enabled, there should not be any
>> penalty of 'hBaseRDD.count' call.
>>
>>
>>
>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>> nick.pentreath@gmail.com> wrote:
>>
>>> Yes, you''re initiating a scan for each count call. The normal way to
>>> improve this would be to use cache(), which is what you have in your
>>> commented out line:
>>> // hBaseRDD.cache()
>>>
>>> If you uncomment that line, you should see an improvement overall.
>>>
>>> If caching is not an option for some reason (maybe data is too large),
>>> then you can implement an overall count in your readFields method using
>>> accumulators:
>>>
>>> val count = sc.accumulator(0L)
>>> ...
>>> In your flatMap function do count += 1 for each row (regardless of
>>> whether "interesting" or not).
>>>
>>> In your main method after doing an action (e.g. count in your case),
>>> call val totalCount = count.value.
>>>
>>>
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>> kumar.soumitra@gmail.com> wrote:
>>>
>>>> I have a code which reads an HBase table, and counts number of rows
>>>> containing a field.
>>>>
>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>> RDD[List[Array[Byte]]] = {
>>>>         return rdd.flatMap(kv => {
>>>>             // Set of interesting keys for this use case
>>>>             val keys = List ("src")
>>>>             var data = List[Array[Byte]]()
>>>>             var usefulRow = false
>>>>
>>>>             val cf = Bytes.toBytes ("cf")
>>>>             keys.foreach {key =>
>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>                 if (col != null)
>>>>                     usefulRow = true
>>>>                 data = data :+ col
>>>>             }
>>>>
>>>>             if (usefulRow)
>>>>                 Some(data)
>>>>             else
>>>>                 None
>>>>         })
>>>>     }
>>>>
>>>>     def main(args: Array[String]) {
>>>>         val hBaseRDD = init(args)
>>>>         // hBaseRDD.cache()
>>>>
>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>         println("**** Rows with interesting fields " +
>>>> readFields(hBaseRDD).count())
>>>>   }
>>>>
>>>>
>>>> I am running on a one mode CDH installation.
>>>>
>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>> 1.5 minutes.
>>>>
>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>> it?
>>>>
>>>> Thanks,
>>>> -Soumitra.
>>>>
>>>
>>>
>>
>

Re: HBase row count

Posted by Nick Pentreath <ni...@gmail.com>.
cache only caches the data on the first action (count) - the first time it
still needs to read the data from the source. So the first time you call
count it will take the same amount of time whether cache is enabled or not.
The second time you call count on a cached RDD, you should see that it
takes a lot less time (assuming that the data fit in memory).


On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <ku...@gmail.com>wrote:

> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>
> My expectation is that with cache enabled, there should not be any penalty
> of 'hBaseRDD.count' call.
>
>
>
> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <nick.pentreath@gmail.com
> > wrote:
>
>> Yes, you''re initiating a scan for each count call. The normal way to
>> improve this would be to use cache(), which is what you have in your
>> commented out line:
>> // hBaseRDD.cache()
>>
>> If you uncomment that line, you should see an improvement overall.
>>
>> If caching is not an option for some reason (maybe data is too large),
>> then you can implement an overall count in your readFields method using
>> accumulators:
>>
>> val count = sc.accumulator(0L)
>> ...
>> In your flatMap function do count += 1 for each row (regardless of
>> whether "interesting" or not).
>>
>> In your main method after doing an action (e.g. count in your case), call val
>> totalCount = count.value.
>>
>>
>>
>>
>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <kumar.soumitra@gmail.com
>> > wrote:
>>
>>> I have a code which reads an HBase table, and counts number of rows
>>> containing a field.
>>>
>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>> RDD[List[Array[Byte]]] = {
>>>         return rdd.flatMap(kv => {
>>>             // Set of interesting keys for this use case
>>>             val keys = List ("src")
>>>             var data = List[Array[Byte]]()
>>>             var usefulRow = false
>>>
>>>             val cf = Bytes.toBytes ("cf")
>>>             keys.foreach {key =>
>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>                 if (col != null)
>>>                     usefulRow = true
>>>                 data = data :+ col
>>>             }
>>>
>>>             if (usefulRow)
>>>                 Some(data)
>>>             else
>>>                 None
>>>         })
>>>     }
>>>
>>>     def main(args: Array[String]) {
>>>         val hBaseRDD = init(args)
>>>         // hBaseRDD.cache()
>>>
>>>         println("**** Initial row count " + hBaseRDD.count())
>>>         println("**** Rows with interesting fields " +
>>> readFields(hBaseRDD).count())
>>>   }
>>>
>>>
>>> I am running on a one mode CDH installation.
>>>
>>> As it is it takes around 2.5 minutes. But if I comment out
>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>> 1.5 minutes.
>>>
>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>> it?
>>>
>>> Thanks,
>>> -Soumitra.
>>>
>>
>>
>

Re: HBase row count

Posted by Soumitra Kumar <ku...@gmail.com>.
I did try with 'hBaseRDD.cache()', but don't see any improvement.

My expectation is that with cache enabled, there should not be any penalty
of 'hBaseRDD.count' call.



On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath
<ni...@gmail.com>wrote:

> Yes, you''re initiating a scan for each count call. The normal way to
> improve this would be to use cache(), which is what you have in your
> commented out line:
> // hBaseRDD.cache()
>
> If you uncomment that line, you should see an improvement overall.
>
> If caching is not an option for some reason (maybe data is too large),
> then you can implement an overall count in your readFields method using
> accumulators:
>
> val count = sc.accumulator(0L)
> ...
> In your flatMap function do count += 1 for each row (regardless of
> whether "interesting" or not).
>
> In your main method after doing an action (e.g. count in your case), call val
> totalCount = count.value.
>
>
>
>
> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <ku...@gmail.com>wrote:
>
>> I have a code which reads an HBase table, and counts number of rows
>> containing a field.
>>
>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>> RDD[List[Array[Byte]]] = {
>>         return rdd.flatMap(kv => {
>>             // Set of interesting keys for this use case
>>             val keys = List ("src")
>>             var data = List[Array[Byte]]()
>>             var usefulRow = false
>>
>>             val cf = Bytes.toBytes ("cf")
>>             keys.foreach {key =>
>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>                 if (col != null)
>>                     usefulRow = true
>>                 data = data :+ col
>>             }
>>
>>             if (usefulRow)
>>                 Some(data)
>>             else
>>                 None
>>         })
>>     }
>>
>>     def main(args: Array[String]) {
>>         val hBaseRDD = init(args)
>>         // hBaseRDD.cache()
>>
>>         println("**** Initial row count " + hBaseRDD.count())
>>         println("**** Rows with interesting fields " +
>> readFields(hBaseRDD).count())
>>   }
>>
>>
>> I am running on a one mode CDH installation.
>>
>> As it is it takes around 2.5 minutes. But if I comment out 'println("****
>> Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
>>
>> Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
>>
>> Thanks,
>> -Soumitra.
>>
>
>

Re: HBase row count

Posted by Nick Pentreath <ni...@gmail.com>.
Yes, you''re initiating a scan for each count call. The normal way to
improve this would be to use cache(), which is what you have in your
commented out line:
// hBaseRDD.cache()

If you uncomment that line, you should see an improvement overall.

If caching is not an option for some reason (maybe data is too large), then
you can implement an overall count in your readFields method using
accumulators:

val count = sc.accumulator(0L)
...
In your flatMap function do count += 1 for each row (regardless of whether
"interesting" or not).

In your main method after doing an action (e.g. count in your case), call val
totalCount = count.value.




On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <ku...@gmail.com>wrote:

> I have a code which reads an HBase table, and counts number of rows
> containing a field.
>
>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
> RDD[List[Array[Byte]]] = {
>         return rdd.flatMap(kv => {
>             // Set of interesting keys for this use case
>             val keys = List ("src")
>             var data = List[Array[Byte]]()
>             var usefulRow = false
>
>             val cf = Bytes.toBytes ("cf")
>             keys.foreach {key =>
>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>                 if (col != null)
>                     usefulRow = true
>                 data = data :+ col
>             }
>
>             if (usefulRow)
>                 Some(data)
>             else
>                 None
>         })
>     }
>
>     def main(args: Array[String]) {
>         val hBaseRDD = init(args)
>         // hBaseRDD.cache()
>
>         println("**** Initial row count " + hBaseRDD.count())
>         println("**** Rows with interesting fields " +
> readFields(hBaseRDD).count())
>   }
>
>
> I am running on a one mode CDH installation.
>
> As it is it takes around 2.5 minutes. But if I comment out 'println("****
> Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
>
> Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
>
> Thanks,
> -Soumitra.
>