You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@phoenix.apache.org by Mark Heppner <he...@gmail.com> on 2017/01/19 16:30:54 UTC

Moving column family into new table

Our use case is to analyze images using Spark. The images are typically
~1MB each, so in order to prevent the small files problem in HDFS, we went
with HBase and Phoenix. For 20+ million images and metadata, this has been
working pretty well so far. Since this is pretty new to us, we didn't
create a robust design:

CREATE TABLE IF NOT EXISTS mytable
(
    id VARCHAR(36) NOT NULL PRIMARY KEY,
    title VARCHAR,
    ...
    image.dtype VARCHAR(12),
    image.width UNSIGNED_INT,
    image.height UNSIGNED_INT,
    image.data VARBINARY
)

Most queries are on the metadata, so all of that is kept in the default
column family. Only the image data is stored in a secondary column family.
Additional indexes are created anyways, so the main table isn't usually
touched.

We first run a Phoenix query to check if there are any matches. If so, then
we start a Spark job on the images. The primary keys are sent to the
PySpark job, which then grabs the images based on the primary keys:

df = sqlContext.read \
    .format('org.apache.phoenix.spark') \
    .option('table', 'mytable') \
    .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
    .load()
df.registerTempTable('mytable')

query =
df_imgs = sqlContext.sql(
    'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
)

When this was first designed, we thought since the lookup was by primary
key, it would be smart enough to do a skip scan, but it appears to be doing
a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which
matches up with the number of split files in HDFS.

Would it be better to use a foreign key and split the tables :

CREATE TABLE IF NOT EXISTS mytable
(
    id VARCHAR(36) NOT NULL PRIMARY KEY,
    title VARCHAR,
    image_id VARCHAR(36)
)
CREATE TABLE IF NOT EXISTS images
(
    image_id VARCHAR(36) NOT NULL PRIMARY KEY,
    dtype VARCHAR(12),
    width UNSIGNED_INT,
    height UNSIGNED_INT,
    data VARBINARY
)

If the first query grabs the image_ids and send them to Spark, would Spark
be able to handle the query more efficiently?

If this is a better design, is there any way of moving the "image" column
family from "mytable" to the default column family of the new "images"
table? Is it possible to create the new table with the "image_id"s, make
the foreign keys, then move the column family into the new table?


-- 
Mark Heppner

Re: Moving column family into new table

Posted by Mark Heppner <he...@gmail.com>.
I'll check when I'm on site tomorrow, but our (much smaller) local cluster
is using the default hbase.hregion.max.filesize of 10 GB for HDP.
hbase.hregion.majorcompaction is set to 7 days, so I'm sure it would have
ran by now.

What would be the best filesize limit? Cloudera suggests having 20-200
regions per RegionServer. Should I try increasing to 20 GB? Why does
Cloudera also say that 5-10 GB is optimal...is it to achieve more regions?

The Spark tasks don't get killed, some sort of RPC call gets timed out.
Nothing appears in the YARN logs, and the Spark history server just has a
state of "failed."

We did small-scale testing of Phoenix+Spark first and fine-tuned as much as
we could, which is how we settled on the current design. Query times seemed
pretty predictable as we scaled up, and our use cases would still fit
within the upper bounds of what we estimated for querying on more data. The
direct queries in Phoenix aren't an issue at all, they're actually working
pretty great, we're just facing a new problem with Spark+Phoenix.

I'll check the on-site cluster and if I find any bugs, I'll be sure to
report them!

On Thu, Jan 19, 2017 at 1:28 PM, Josh Mahonin <jm...@gmail.com> wrote:

> It's a bit peculiar that you've got it pre-split to 10 salt buckets, but
> seeing 400+ partitions. It sounds like HBase is splitting the regions on
> you, possibly due to the 'hbase.hregion.max.filesize' setting. You should
> be able to check the HBase Master UI and see the table details to see how
> many regions there are, and what nodes they're located on. Right now, the
> Phoenix MR / Spark integration basically assigns one partition per region.
>
> As a total guess, I wonder if somehow the first 380 partitions are
> relatively sparse, and the bulk of the data is in the remaining 70
> partitions. You might be able to diagnose that by adding some logging in a
> 'mapPartitions()' call. It's possible that running a major compaction on
> that table might help redistribute the data as well.
>
> If you're seeing your task getting killed, definitely try dig into the
> Spark executor / driver logs to try find a root cause. If you're using
> YARN, you can usually get into the Spark history server, then check the
> 'stdout' / 'stderr' logs for each executor.
>
> Re: architecture recommendations, it's possible that phoenix-spark isn't
> the right tool for this job, though we routinely read / write billions of
> rows with it. I'd recommend trying to start with a smaller subset of your
> data and make sure you've got the schema, queries and HBase settings setup
> the way you like, then add Spark into the mix. Then start adding a bit more
> data, check results, find any bottlenecks, and tune as needed.
>
> If you're able to identify any issues specifically with Phoenix, bug
> reports and patches are greatly appreciated!
>
> Best of luck,
>
> Josh
>
>
> On Thu, Jan 19, 2017 at 12:30 PM, Mark Heppner <he...@gmail.com>
> wrote:
>
>> Thanks for the quick reply, Josh!
>>
>> For our demo cluster, we have 5 nodes, so the table was already set to 10
>> salt buckets. I know you can increase the salt buckets after the table is
>> created, but how do you change the split points? The repartition in Spark
>> seemed to be extremely inefficient, so we were trying to skip it and keep
>> the 400+ default partitions.
>>
>> The biggest issue we're facing is that as Spark goes through the
>> partitions during the scan, it becomes exponentially slower towards the
>> end. Around task 380/450, it slows down to a halt, eventually timing out
>> around 410 and getting killed. We have no idea if this is something with
>> Spark, YARN, or HBase, so that's why we were brainstorming with using the
>> foreign key-based layout, hoping that the files on HDFS would be more
>> compacted.
>>
>> We haven't noticed too much network overhead, nor have we seen CPU or RAM
>> usage too high. Our nodes are pretty big, 32 cores and 256 GB RAM each,
>> connected on a 10 GbE network. Even if our query is for 80-100 rows, the
>> Spark job still slows to a crawl at the end, but that should really only be
>> about 80 MB of data it would be pulling out of Phoenix into the executors.
>> I guess we should have verified that the Phoenix+Spark plugin did achieve
>> data locality, but there isn't anything that says otherwise. Even though it
>> doesn't have data locality, we have no idea why it would progressively slow
>> down as it reaches the end of the scan/filter.
>>
>> The images are converted to a NumPy array, then saved as a binary string
>> into Phoenix. In Spark, this is fairly quick to convert the binary string
>> back to the NumPy array. This also allows us to use GET_BYTE() from Phoenix
>> to extract specific values within the array, without going through Spark at
>> all. Do you have any other architecture recommendations for our use case?
>> Would storing the images directly in HBase be any better?
>>
>> On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jm...@gmail.com>
>> wrote:
>>
>>> Hi Mark,
>>>
>>> At present, the Spark partitions are basically equivalent to the number
>>> of regions in the underlying HBase table. This is typically something you
>>> can control yourself, either using pre-splitting or salting (
>>> https://phoenix.apache.org/faq.html#Are_there_any_tips_for_
>>> optimizing_Phoenix). Given that you have 450+ partitions though, it
>>> sounds like you should be able to achieve a decent level or parallelism,
>>> but that's a knob you can fiddle with. It might also be useful to look at
>>> Spark's "repartition" operation if you have idle Spark executors.
>>>
>>> The partitioning is sort of orthogonal from the primary key layout and
>>> the resulting query efficiency, but the strategy you've taken with your
>>> schema seems fairly sensible to me. Given that your primary key is the 'id'
>>> field, the query you're using is going to be much more efficient than,
>>> e.g., filtering on the 'title' column. Iterating on your schema and queries
>>> using straight SQL and then applying that to Spark after is probably a good
>>> strategy here to get more familiar with query performance.
>>>
>>> If you're reading the binary 'data' column in Spark and seeing a lot of
>>> network overhead, one thing to be aware of is the present Phoenix MR /
>>> Spark code isn't location aware, so executors are likely reading big chunks
>>> of data from another node. There's a few patches in to address this, but
>>> they're not in a released version yet:
>>>
>>> https://issues.apache.org/jira/browse/PHOENIX-3600
>>> https://issues.apache.org/jira/browse/PHOENIX-3601
>>>
>>> Good luck!
>>>
>>> Josh
>>>
>>>
>>>
>>>
>>> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <he...@gmail.com>
>>> wrote:
>>>
>>>> Our use case is to analyze images using Spark. The images are typically
>>>> ~1MB each, so in order to prevent the small files problem in HDFS, we went
>>>> with HBase and Phoenix. For 20+ million images and metadata, this has been
>>>> working pretty well so far. Since this is pretty new to us, we didn't
>>>> create a robust design:
>>>>
>>>> CREATE TABLE IF NOT EXISTS mytable
>>>> (
>>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>>     title VARCHAR,
>>>>     ...
>>>>     image.dtype VARCHAR(12),
>>>>     image.width UNSIGNED_INT,
>>>>     image.height UNSIGNED_INT,
>>>>     image.data VARBINARY
>>>> )
>>>>
>>>> Most queries are on the metadata, so all of that is kept in the default
>>>> column family. Only the image data is stored in a secondary column family.
>>>> Additional indexes are created anyways, so the main table isn't usually
>>>> touched.
>>>>
>>>> We first run a Phoenix query to check if there are any matches. If so,
>>>> then we start a Spark job on the images. The primary keys are sent to the
>>>> PySpark job, which then grabs the images based on the primary keys:
>>>>
>>>> df = sqlContext.read \
>>>>     .format('org.apache.phoenix.spark') \
>>>>     .option('table', 'mytable') \
>>>>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>>>>     .load()
>>>> df.registerTempTable('mytable')
>>>>
>>>> query =
>>>> df_imgs = sqlContext.sql(
>>>>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
>>>> )
>>>>
>>>> When this was first designed, we thought since the lookup was by
>>>> primary key, it would be smart enough to do a skip scan, but it appears to
>>>> be doing a full scan. The df_imgs.rdd.getNumPartitions() ends up being
>>>> 450+, which matches up with the number of split files in HDFS.
>>>>
>>>> Would it be better to use a foreign key and split the tables :
>>>>
>>>> CREATE TABLE IF NOT EXISTS mytable
>>>> (
>>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>>     title VARCHAR,
>>>>     image_id VARCHAR(36)
>>>> )
>>>> CREATE TABLE IF NOT EXISTS images
>>>> (
>>>>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>>     dtype VARCHAR(12),
>>>>     width UNSIGNED_INT,
>>>>     height UNSIGNED_INT,
>>>>     data VARBINARY
>>>> )
>>>>
>>>> If the first query grabs the image_ids and send them to Spark, would
>>>> Spark be able to handle the query more efficiently?
>>>>
>>>> If this is a better design, is there any way of moving the "image"
>>>> column family from "mytable" to the default column family of the new
>>>> "images" table? Is it possible to create the new table with the
>>>> "image_id"s, make the foreign keys, then move the column family into the
>>>> new table?
>>>>
>>>>
>>>> --
>>>> Mark Heppner
>>>>
>>>
>>>
>>
>>
>> --
>> Mark Heppner
>>
>
>


-- 
Mark Heppner

Re: Moving column family into new table

Posted by Josh Mahonin <jm...@gmail.com>.
It's a bit peculiar that you've got it pre-split to 10 salt buckets, but
seeing 400+ partitions. It sounds like HBase is splitting the regions on
you, possibly due to the 'hbase.hregion.max.filesize' setting. You should
be able to check the HBase Master UI and see the table details to see how
many regions there are, and what nodes they're located on. Right now, the
Phoenix MR / Spark integration basically assigns one partition per region.

As a total guess, I wonder if somehow the first 380 partitions are
relatively sparse, and the bulk of the data is in the remaining 70
partitions. You might be able to diagnose that by adding some logging in a
'mapPartitions()' call. It's possible that running a major compaction on
that table might help redistribute the data as well.

If you're seeing your task getting killed, definitely try dig into the
Spark executor / driver logs to try find a root cause. If you're using
YARN, you can usually get into the Spark history server, then check the
'stdout' / 'stderr' logs for each executor.

Re: architecture recommendations, it's possible that phoenix-spark isn't
the right tool for this job, though we routinely read / write billions of
rows with it. I'd recommend trying to start with a smaller subset of your
data and make sure you've got the schema, queries and HBase settings setup
the way you like, then add Spark into the mix. Then start adding a bit more
data, check results, find any bottlenecks, and tune as needed.

If you're able to identify any issues specifically with Phoenix, bug
reports and patches are greatly appreciated!

Best of luck,

Josh


On Thu, Jan 19, 2017 at 12:30 PM, Mark Heppner <he...@gmail.com>
wrote:

> Thanks for the quick reply, Josh!
>
> For our demo cluster, we have 5 nodes, so the table was already set to 10
> salt buckets. I know you can increase the salt buckets after the table is
> created, but how do you change the split points? The repartition in Spark
> seemed to be extremely inefficient, so we were trying to skip it and keep
> the 400+ default partitions.
>
> The biggest issue we're facing is that as Spark goes through the
> partitions during the scan, it becomes exponentially slower towards the
> end. Around task 380/450, it slows down to a halt, eventually timing out
> around 410 and getting killed. We have no idea if this is something with
> Spark, YARN, or HBase, so that's why we were brainstorming with using the
> foreign key-based layout, hoping that the files on HDFS would be more
> compacted.
>
> We haven't noticed too much network overhead, nor have we seen CPU or RAM
> usage too high. Our nodes are pretty big, 32 cores and 256 GB RAM each,
> connected on a 10 GbE network. Even if our query is for 80-100 rows, the
> Spark job still slows to a crawl at the end, but that should really only be
> about 80 MB of data it would be pulling out of Phoenix into the executors.
> I guess we should have verified that the Phoenix+Spark plugin did achieve
> data locality, but there isn't anything that says otherwise. Even though it
> doesn't have data locality, we have no idea why it would progressively slow
> down as it reaches the end of the scan/filter.
>
> The images are converted to a NumPy array, then saved as a binary string
> into Phoenix. In Spark, this is fairly quick to convert the binary string
> back to the NumPy array. This also allows us to use GET_BYTE() from Phoenix
> to extract specific values within the array, without going through Spark at
> all. Do you have any other architecture recommendations for our use case?
> Would storing the images directly in HBase be any better?
>
> On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jm...@gmail.com> wrote:
>
>> Hi Mark,
>>
>> At present, the Spark partitions are basically equivalent to the number
>> of regions in the underlying HBase table. This is typically something you
>> can control yourself, either using pre-splitting or salting (
>> https://phoenix.apache.org/faq.html#Are_there_any_tips_for_
>> optimizing_Phoenix). Given that you have 450+ partitions though, it
>> sounds like you should be able to achieve a decent level or parallelism,
>> but that's a knob you can fiddle with. It might also be useful to look at
>> Spark's "repartition" operation if you have idle Spark executors.
>>
>> The partitioning is sort of orthogonal from the primary key layout and
>> the resulting query efficiency, but the strategy you've taken with your
>> schema seems fairly sensible to me. Given that your primary key is the 'id'
>> field, the query you're using is going to be much more efficient than,
>> e.g., filtering on the 'title' column. Iterating on your schema and queries
>> using straight SQL and then applying that to Spark after is probably a good
>> strategy here to get more familiar with query performance.
>>
>> If you're reading the binary 'data' column in Spark and seeing a lot of
>> network overhead, one thing to be aware of is the present Phoenix MR /
>> Spark code isn't location aware, so executors are likely reading big chunks
>> of data from another node. There's a few patches in to address this, but
>> they're not in a released version yet:
>>
>> https://issues.apache.org/jira/browse/PHOENIX-3600
>> https://issues.apache.org/jira/browse/PHOENIX-3601
>>
>> Good luck!
>>
>> Josh
>>
>>
>>
>>
>> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <he...@gmail.com>
>> wrote:
>>
>>> Our use case is to analyze images using Spark. The images are typically
>>> ~1MB each, so in order to prevent the small files problem in HDFS, we went
>>> with HBase and Phoenix. For 20+ million images and metadata, this has been
>>> working pretty well so far. Since this is pretty new to us, we didn't
>>> create a robust design:
>>>
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     ...
>>>     image.dtype VARCHAR(12),
>>>     image.width UNSIGNED_INT,
>>>     image.height UNSIGNED_INT,
>>>     image.data VARBINARY
>>> )
>>>
>>> Most queries are on the metadata, so all of that is kept in the default
>>> column family. Only the image data is stored in a secondary column family.
>>> Additional indexes are created anyways, so the main table isn't usually
>>> touched.
>>>
>>> We first run a Phoenix query to check if there are any matches. If so,
>>> then we start a Spark job on the images. The primary keys are sent to the
>>> PySpark job, which then grabs the images based on the primary keys:
>>>
>>> df = sqlContext.read \
>>>     .format('org.apache.phoenix.spark') \
>>>     .option('table', 'mytable') \
>>>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>>>     .load()
>>> df.registerTempTable('mytable')
>>>
>>> query =
>>> df_imgs = sqlContext.sql(
>>>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
>>> )
>>>
>>> When this was first designed, we thought since the lookup was by primary
>>> key, it would be smart enough to do a skip scan, but it appears to be doing
>>> a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which
>>> matches up with the number of split files in HDFS.
>>>
>>> Would it be better to use a foreign key and split the tables :
>>>
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     image_id VARCHAR(36)
>>> )
>>> CREATE TABLE IF NOT EXISTS images
>>> (
>>>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     dtype VARCHAR(12),
>>>     width UNSIGNED_INT,
>>>     height UNSIGNED_INT,
>>>     data VARBINARY
>>> )
>>>
>>> If the first query grabs the image_ids and send them to Spark, would
>>> Spark be able to handle the query more efficiently?
>>>
>>> If this is a better design, is there any way of moving the "image"
>>> column family from "mytable" to the default column family of the new
>>> "images" table? Is it possible to create the new table with the
>>> "image_id"s, make the foreign keys, then move the column family into the
>>> new table?
>>>
>>>
>>> --
>>> Mark Heppner
>>>
>>
>>
>
>
> --
> Mark Heppner
>

Re: Moving column family into new table

Posted by Mark Heppner <he...@gmail.com>.
Jonathan,
I do check the queries using EXPLAIN, but it doesn't work the same in
Spark. In Spark, I can only see a very generic plan and it only tells me if
certain filters are pushed down to Phoenix or not. Query hints are ignored,
since they're first translated by the Spark or Hive query evaluator, before
ever getting sent to Phoenix. The design of the separate column families
seems to work pretty great for us for Phoenix directly, but not so much for
Spark. I can't figure out why Spark is slowing down so much at the end of
the scan.

On Thu, Jan 19, 2017 at 1:09 PM, Jonathan Leech <jo...@gmail.com> wrote:

> Do an explain on your query to confirm that it's doing a full scan and not
> a skip scan.
>
> I typically use an in () clause instead of or, especially with compound
> keys. I have also had to hint queries to use a skip scan, e.g /*+ SKIP_SCAN
> */.
>
> Phoenix seems to do a very good job not reading data from column families
> that aren't needed by the query, so I think your schema design is fine.
>
> On Jan 19, 2017, at 10:30 AM, Mark Heppner <he...@gmail.com> wrote:
>
> Thanks for the quick reply, Josh!
>
> For our demo cluster, we have 5 nodes, so the table was already set to 10
> salt buckets. I know you can increase the salt buckets after the table is
> created, but how do you change the split points? The repartition in Spark
> seemed to be extremely inefficient, so we were trying to skip it and keep
> the 400+ default partitions.
>
> The biggest issue we're facing is that as Spark goes through the
> partitions during the scan, it becomes exponentially slower towards the
> end. Around task 380/450, it slows down to a halt, eventually timing out
> around 410 and getting killed. We have no idea if this is something with
> Spark, YARN, or HBase, so that's why we were brainstorming with using the
> foreign key-based layout, hoping that the files on HDFS would be more
> compacted.
>
> We haven't noticed too much network overhead, nor have we seen CPU or RAM
> usage too high. Our nodes are pretty big, 32 cores and 256 GB RAM each,
> connected on a 10 GbE network. Even if our query is for 80-100 rows, the
> Spark job still slows to a crawl at the end, but that should really only be
> about 80 MB of data it would be pulling out of Phoenix into the executors.
> I guess we should have verified that the Phoenix+Spark plugin did achieve
> data locality, but there isn't anything that says otherwise. Even though it
> doesn't have data locality, we have no idea why it would progressively slow
> down as it reaches the end of the scan/filter.
>
> The images are converted to a NumPy array, then saved as a binary string
> into Phoenix. In Spark, this is fairly quick to convert the binary string
> back to the NumPy array. This also allows us to use GET_BYTE() from Phoenix
> to extract specific values within the array, without going through Spark at
> all. Do you have any other architecture recommendations for our use case?
> Would storing the images directly in HBase be any better?
>
> On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jm...@gmail.com> wrote:
>
>> Hi Mark,
>>
>> At present, the Spark partitions are basically equivalent to the number
>> of regions in the underlying HBase table. This is typically something you
>> can control yourself, either using pre-splitting or salting (
>> https://phoenix.apache.org/faq.html#Are_there_any_tips_for_
>> optimizing_Phoenix). Given that you have 450+ partitions though, it
>> sounds like you should be able to achieve a decent level or parallelism,
>> but that's a knob you can fiddle with. It might also be useful to look at
>> Spark's "repartition" operation if you have idle Spark executors.
>>
>> The partitioning is sort of orthogonal from the primary key layout and
>> the resulting query efficiency, but the strategy you've taken with your
>> schema seems fairly sensible to me. Given that your primary key is the 'id'
>> field, the query you're using is going to be much more efficient than,
>> e.g., filtering on the 'title' column. Iterating on your schema and queries
>> using straight SQL and then applying that to Spark after is probably a good
>> strategy here to get more familiar with query performance.
>>
>> If you're reading the binary 'data' column in Spark and seeing a lot of
>> network overhead, one thing to be aware of is the present Phoenix MR /
>> Spark code isn't location aware, so executors are likely reading big chunks
>> of data from another node. There's a few patches in to address this, but
>> they're not in a released version yet:
>>
>> https://issues.apache.org/jira/browse/PHOENIX-3600
>> https://issues.apache.org/jira/browse/PHOENIX-3601
>>
>> Good luck!
>>
>> Josh
>>
>>
>>
>>
>> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <he...@gmail.com>
>> wrote:
>>
>>> Our use case is to analyze images using Spark. The images are typically
>>> ~1MB each, so in order to prevent the small files problem in HDFS, we went
>>> with HBase and Phoenix. For 20+ million images and metadata, this has been
>>> working pretty well so far. Since this is pretty new to us, we didn't
>>> create a robust design:
>>>
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     ...
>>>     image.dtype VARCHAR(12),
>>>     image.width UNSIGNED_INT,
>>>     image.height UNSIGNED_INT,
>>>     image.data VARBINARY
>>> )
>>>
>>> Most queries are on the metadata, so all of that is kept in the default
>>> column family. Only the image data is stored in a secondary column family.
>>> Additional indexes are created anyways, so the main table isn't usually
>>> touched.
>>>
>>> We first run a Phoenix query to check if there are any matches. If so,
>>> then we start a Spark job on the images. The primary keys are sent to the
>>> PySpark job, which then grabs the images based on the primary keys:
>>>
>>> df = sqlContext.read \
>>>     .format('org.apache.phoenix.spark') \
>>>     .option('table', 'mytable') \
>>>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>>>     .load()
>>> df.registerTempTable('mytable')
>>>
>>> query =
>>> df_imgs = sqlContext.sql(
>>>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
>>> )
>>>
>>> When this was first designed, we thought since the lookup was by primary
>>> key, it would be smart enough to do a skip scan, but it appears to be doing
>>> a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which
>>> matches up with the number of split files in HDFS.
>>>
>>> Would it be better to use a foreign key and split the tables :
>>>
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     image_id VARCHAR(36)
>>> )
>>> CREATE TABLE IF NOT EXISTS images
>>> (
>>>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     dtype VARCHAR(12),
>>>     width UNSIGNED_INT,
>>>     height UNSIGNED_INT,
>>>     data VARBINARY
>>> )
>>>
>>> If the first query grabs the image_ids and send them to Spark, would
>>> Spark be able to handle the query more efficiently?
>>>
>>> If this is a better design, is there any way of moving the "image"
>>> column family from "mytable" to the default column family of the new
>>> "images" table? Is it possible to create the new table with the
>>> "image_id"s, make the foreign keys, then move the column family into the
>>> new table?
>>>
>>>
>>> --
>>> Mark Heppner
>>>
>>
>>
>
>
> --
> Mark Heppner
>
>


-- 
Mark Heppner

Re: Moving column family into new table

Posted by Jonathan Leech <jo...@gmail.com>.
Do an explain on your query to confirm that it's doing a full scan and not a skip scan.

I typically use an in () clause instead of or, especially with compound keys. I have also had to hint queries to use a skip scan, e.g /*+ SKIP_SCAN */.

Phoenix seems to do a very good job not reading data from column families that aren't needed by the query, so I think your schema design is fine.

> On Jan 19, 2017, at 10:30 AM, Mark Heppner <he...@gmail.com> wrote:
> 
> Thanks for the quick reply, Josh!
> 
> For our demo cluster, we have 5 nodes, so the table was already set to 10 salt buckets. I know you can increase the salt buckets after the table is created, but how do you change the split points? The repartition in Spark seemed to be extremely inefficient, so we were trying to skip it and keep the 400+ default partitions.
> 
> The biggest issue we're facing is that as Spark goes through the partitions during the scan, it becomes exponentially slower towards the end. Around task 380/450, it slows down to a halt, eventually timing out around 410 and getting killed. We have no idea if this is something with Spark, YARN, or HBase, so that's why we were brainstorming with using the foreign key-based layout, hoping that the files on HDFS would be more compacted.
> 
> We haven't noticed too much network overhead, nor have we seen CPU or RAM usage too high. Our nodes are pretty big, 32 cores and 256 GB RAM each, connected on a 10 GbE network. Even if our query is for 80-100 rows, the Spark job still slows to a crawl at the end, but that should really only be about 80 MB of data it would be pulling out of Phoenix into the executors. I guess we should have verified that the Phoenix+Spark plugin did achieve data locality, but there isn't anything that says otherwise. Even though it doesn't have data locality, we have no idea why it would progressively slow down as it reaches the end of the scan/filter.
> 
> The images are converted to a NumPy array, then saved as a binary string into Phoenix. In Spark, this is fairly quick to convert the binary string back to the NumPy array. This also allows us to use GET_BYTE() from Phoenix to extract specific values within the array, without going through Spark at all. Do you have any other architecture recommendations for our use case? Would storing the images directly in HBase be any better?
> 
>> On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jm...@gmail.com> wrote:
>> Hi Mark,
>> 
>> At present, the Spark partitions are basically equivalent to the number of regions in the underlying HBase table. This is typically something you can control yourself, either using pre-splitting or salting (https://phoenix.apache.org/faq.html#Are_there_any_tips_for_optimizing_Phoenix). Given that you have 450+ partitions though, it sounds like you should be able to achieve a decent level or parallelism, but that's a knob you can fiddle with. It might also be useful to look at Spark's "repartition" operation if you have idle Spark executors.
>> 
>> The partitioning is sort of orthogonal from the primary key layout and the resulting query efficiency, but the strategy you've taken with your schema seems fairly sensible to me. Given that your primary key is the 'id' field, the query you're using is going to be much more efficient than, e.g., filtering on the 'title' column. Iterating on your schema and queries using straight SQL and then applying that to Spark after is probably a good strategy here to get more familiar with query performance.
>> 
>> If you're reading the binary 'data' column in Spark and seeing a lot of network overhead, one thing to be aware of is the present Phoenix MR / Spark code isn't location aware, so executors are likely reading big chunks of data from another node. There's a few patches in to address this, but they're not in a released version yet:
>> 
>> https://issues.apache.org/jira/browse/PHOENIX-3600
>> https://issues.apache.org/jira/browse/PHOENIX-3601
>> 
>> Good luck!
>> 
>> Josh
>> 
>> 
>> 
>> 
>>> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <he...@gmail.com> wrote:
>>> Our use case is to analyze images using Spark. The images are typically ~1MB each, so in order to prevent the small files problem in HDFS, we went with HBase and Phoenix. For 20+ million images and metadata, this has been working pretty well so far. Since this is pretty new to us, we didn't create a robust design:
>>> 
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     ...
>>>     image.dtype VARCHAR(12),
>>>     image.width UNSIGNED_INT,
>>>     image.height UNSIGNED_INT,
>>>     image.data VARBINARY
>>> )
>>> 
>>> Most queries are on the metadata, so all of that is kept in the default column family. Only the image data is stored in a secondary column family. Additional indexes are created anyways, so the main table isn't usually touched.
>>> 
>>> We first run a Phoenix query to check if there are any matches. If so, then we start a Spark job on the images. The primary keys are sent to the PySpark job, which then grabs the images based on the primary keys:
>>> 
>>> df = sqlContext.read \
>>>     .format('org.apache.phoenix.spark') \
>>>     .option('table', 'mytable') \
>>>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>>>     .load()
>>> df.registerTempTable('mytable')
>>> 
>>> query = 
>>> df_imgs = sqlContext.sql(
>>>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
>>> )
>>> 
>>> When this was first designed, we thought since the lookup was by primary key, it would be smart enough to do a skip scan, but it appears to be doing a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which matches up with the number of split files in HDFS.
>>> 
>>> Would it be better to use a foreign key and split the tables :
>>> 
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     image_id VARCHAR(36)
>>> )
>>> CREATE TABLE IF NOT EXISTS images
>>> (
>>>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     dtype VARCHAR(12),
>>>     width UNSIGNED_INT,
>>>     height UNSIGNED_INT,
>>>     data VARBINARY
>>> )
>>> 
>>> If the first query grabs the image_ids and send them to Spark, would Spark be able to handle the query more efficiently?
>>> 
>>> If this is a better design, is there any way of moving the "image" column family from "mytable" to the default column family of the new "images" table? Is it possible to create the new table with the "image_id"s, make the foreign keys, then move the column family into the new table?
>>> 
>>> 
>>> -- 
>>> Mark Heppner
>> 
> 
> 
> 
> -- 
> Mark Heppner

Re: Moving column family into new table

Posted by Mark Heppner <he...@gmail.com>.
Thanks for the quick reply, Josh!

For our demo cluster, we have 5 nodes, so the table was already set to 10
salt buckets. I know you can increase the salt buckets after the table is
created, but how do you change the split points? The repartition in Spark
seemed to be extremely inefficient, so we were trying to skip it and keep
the 400+ default partitions.

The biggest issue we're facing is that as Spark goes through the partitions
during the scan, it becomes exponentially slower towards the end. Around
task 380/450, it slows down to a halt, eventually timing out around 410 and
getting killed. We have no idea if this is something with Spark, YARN, or
HBase, so that's why we were brainstorming with using the foreign key-based
layout, hoping that the files on HDFS would be more compacted.

We haven't noticed too much network overhead, nor have we seen CPU or RAM
usage too high. Our nodes are pretty big, 32 cores and 256 GB RAM each,
connected on a 10 GbE network. Even if our query is for 80-100 rows, the
Spark job still slows to a crawl at the end, but that should really only be
about 80 MB of data it would be pulling out of Phoenix into the executors.
I guess we should have verified that the Phoenix+Spark plugin did achieve
data locality, but there isn't anything that says otherwise. Even though it
doesn't have data locality, we have no idea why it would progressively slow
down as it reaches the end of the scan/filter.

The images are converted to a NumPy array, then saved as a binary string
into Phoenix. In Spark, this is fairly quick to convert the binary string
back to the NumPy array. This also allows us to use GET_BYTE() from Phoenix
to extract specific values within the array, without going through Spark at
all. Do you have any other architecture recommendations for our use case?
Would storing the images directly in HBase be any better?

On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jm...@gmail.com> wrote:

> Hi Mark,
>
> At present, the Spark partitions are basically equivalent to the number of
> regions in the underlying HBase table. This is typically something you can
> control yourself, either using pre-splitting or salting (
> https://phoenix.apache.org/faq.html#Are_there_any_tips_
> for_optimizing_Phoenix). Given that you have 450+ partitions though, it
> sounds like you should be able to achieve a decent level or parallelism,
> but that's a knob you can fiddle with. It might also be useful to look at
> Spark's "repartition" operation if you have idle Spark executors.
>
> The partitioning is sort of orthogonal from the primary key layout and the
> resulting query efficiency, but the strategy you've taken with your schema
> seems fairly sensible to me. Given that your primary key is the 'id' field,
> the query you're using is going to be much more efficient than, e.g.,
> filtering on the 'title' column. Iterating on your schema and queries using
> straight SQL and then applying that to Spark after is probably a good
> strategy here to get more familiar with query performance.
>
> If you're reading the binary 'data' column in Spark and seeing a lot of
> network overhead, one thing to be aware of is the present Phoenix MR /
> Spark code isn't location aware, so executors are likely reading big chunks
> of data from another node. There's a few patches in to address this, but
> they're not in a released version yet:
>
> https://issues.apache.org/jira/browse/PHOENIX-3600
> https://issues.apache.org/jira/browse/PHOENIX-3601
>
> Good luck!
>
> Josh
>
>
>
>
> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <he...@gmail.com>
> wrote:
>
>> Our use case is to analyze images using Spark. The images are typically
>> ~1MB each, so in order to prevent the small files problem in HDFS, we went
>> with HBase and Phoenix. For 20+ million images and metadata, this has been
>> working pretty well so far. Since this is pretty new to us, we didn't
>> create a robust design:
>>
>> CREATE TABLE IF NOT EXISTS mytable
>> (
>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>     title VARCHAR,
>>     ...
>>     image.dtype VARCHAR(12),
>>     image.width UNSIGNED_INT,
>>     image.height UNSIGNED_INT,
>>     image.data VARBINARY
>> )
>>
>> Most queries are on the metadata, so all of that is kept in the default
>> column family. Only the image data is stored in a secondary column family.
>> Additional indexes are created anyways, so the main table isn't usually
>> touched.
>>
>> We first run a Phoenix query to check if there are any matches. If so,
>> then we start a Spark job on the images. The primary keys are sent to the
>> PySpark job, which then grabs the images based on the primary keys:
>>
>> df = sqlContext.read \
>>     .format('org.apache.phoenix.spark') \
>>     .option('table', 'mytable') \
>>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>>     .load()
>> df.registerTempTable('mytable')
>>
>> query =
>> df_imgs = sqlContext.sql(
>>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
>> )
>>
>> When this was first designed, we thought since the lookup was by primary
>> key, it would be smart enough to do a skip scan, but it appears to be doing
>> a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which
>> matches up with the number of split files in HDFS.
>>
>> Would it be better to use a foreign key and split the tables :
>>
>> CREATE TABLE IF NOT EXISTS mytable
>> (
>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>     title VARCHAR,
>>     image_id VARCHAR(36)
>> )
>> CREATE TABLE IF NOT EXISTS images
>> (
>>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>>     dtype VARCHAR(12),
>>     width UNSIGNED_INT,
>>     height UNSIGNED_INT,
>>     data VARBINARY
>> )
>>
>> If the first query grabs the image_ids and send them to Spark, would
>> Spark be able to handle the query more efficiently?
>>
>> If this is a better design, is there any way of moving the "image" column
>> family from "mytable" to the default column family of the new "images"
>> table? Is it possible to create the new table with the "image_id"s, make
>> the foreign keys, then move the column family into the new table?
>>
>>
>> --
>> Mark Heppner
>>
>
>


-- 
Mark Heppner

Re: Moving column family into new table

Posted by Josh Mahonin <jm...@gmail.com>.
Hi Mark,

At present, the Spark partitions are basically equivalent to the number of
regions in the underlying HBase table. This is typically something you can
control yourself, either using pre-splitting or salting (
https://phoenix.apache.org/faq.html#Are_there_any_tips_for_optimizing_Phoenix).
Given that you have 450+ partitions though, it sounds like you should be
able to achieve a decent level or parallelism, but that's a knob you can
fiddle with. It might also be useful to look at Spark's "repartition"
operation if you have idle Spark executors.

The partitioning is sort of orthogonal from the primary key layout and the
resulting query efficiency, but the strategy you've taken with your schema
seems fairly sensible to me. Given that your primary key is the 'id' field,
the query you're using is going to be much more efficient than, e.g.,
filtering on the 'title' column. Iterating on your schema and queries using
straight SQL and then applying that to Spark after is probably a good
strategy here to get more familiar with query performance.

If you're reading the binary 'data' column in Spark and seeing a lot of
network overhead, one thing to be aware of is the present Phoenix MR /
Spark code isn't location aware, so executors are likely reading big chunks
of data from another node. There's a few patches in to address this, but
they're not in a released version yet:

https://issues.apache.org/jira/browse/PHOENIX-3600
https://issues.apache.org/jira/browse/PHOENIX-3601

Good luck!

Josh




On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <he...@gmail.com>
wrote:

> Our use case is to analyze images using Spark. The images are typically
> ~1MB each, so in order to prevent the small files problem in HDFS, we went
> with HBase and Phoenix. For 20+ million images and metadata, this has been
> working pretty well so far. Since this is pretty new to us, we didn't
> create a robust design:
>
> CREATE TABLE IF NOT EXISTS mytable
> (
>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>     title VARCHAR,
>     ...
>     image.dtype VARCHAR(12),
>     image.width UNSIGNED_INT,
>     image.height UNSIGNED_INT,
>     image.data VARBINARY
> )
>
> Most queries are on the metadata, so all of that is kept in the default
> column family. Only the image data is stored in a secondary column family.
> Additional indexes are created anyways, so the main table isn't usually
> touched.
>
> We first run a Phoenix query to check if there are any matches. If so,
> then we start a Spark job on the images. The primary keys are sent to the
> PySpark job, which then grabs the images based on the primary keys:
>
> df = sqlContext.read \
>     .format('org.apache.phoenix.spark') \
>     .option('table', 'mytable') \
>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>     .load()
> df.registerTempTable('mytable')
>
> query =
> df_imgs = sqlContext.sql(
>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
> )
>
> When this was first designed, we thought since the lookup was by primary
> key, it would be smart enough to do a skip scan, but it appears to be doing
> a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which
> matches up with the number of split files in HDFS.
>
> Would it be better to use a foreign key and split the tables :
>
> CREATE TABLE IF NOT EXISTS mytable
> (
>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>     title VARCHAR,
>     image_id VARCHAR(36)
> )
> CREATE TABLE IF NOT EXISTS images
> (
>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>     dtype VARCHAR(12),
>     width UNSIGNED_INT,
>     height UNSIGNED_INT,
>     data VARBINARY
> )
>
> If the first query grabs the image_ids and send them to Spark, would Spark
> be able to handle the query more efficiently?
>
> If this is a better design, is there any way of moving the "image" column
> family from "mytable" to the default column family of the new "images"
> table? Is it possible to create the new table with the "image_id"s, make
> the foreign keys, then move the column family into the new table?
>
>
> --
> Mark Heppner
>