You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@phoenix.apache.org by "Long, Xindian" <Xi...@sensus.com> on 2016/06/08 22:01:12 UTC

phoenix spark options not supporint query in dbtable

The Spark JDBC data source supports to specify a query as the  "dbtable" option.
I assume all queries in the above query in pushed down to the database instead of done in Spark.

The  phoenix spark plug in seems not supporting that. Why is that? Any plan in the future to support it?

I know phoenix spark does support an optional select clause and predicate push down in some cases, but it is limited.

Thanks

Xindian


-------------------------------------------
Xindian "Shindian" Long
Mobile:  919-9168651
Email: Xindian.Long@gmail.com<ma...@gmail.com>




Re: phoenix spark options not supporint query in dbtable

Posted by Josh Mahonin <jm...@gmail.com>.
You're mostly at the mercy of HBase and Phoenix to ensure that your data is
evenly distributed in the underlying regions. You could look at
pre-splitting or salting [1] your tables, as well as adjusting the
guidepost parameters [2] if you need finer tuned control.

If you end up with more idle Spark workers than RDD partitions, a pattern
I've seen is to simply repartition() the RDD / DataFrame after it's loaded
to a higher level of parallelism. You pay some overhead cost to
redistribute the data between executors, but you may make it up by having
more workers processing the data.

Josh

[1] https://phoenix.apache.org/salted.html
[2] https://phoenix.apache.org/tuning_guide.html

On Thu, Aug 17, 2017 at 2:36 PM, Kanagha <er...@gmail.com> wrote:

> Thanks for the details.
>
> I tested out and saw that the no.of partitions equals to the no.of
> parallel scans run upon DataFrame load in phoenix 4.10.
> Also, how can we ensure that the read gets evenly distributed as tasks
> across the no.of executors set for the job? I'm running
> phoenixTableAsDataFrame API on a table with 4-way parallel scans and with
> 4 executors set for the job. Thanks for the inputs.
>
>
> Kanagha
>
> On Thu, Aug 17, 2017 at 7:17 AM, Josh Mahonin <jm...@gmail.com> wrote:
>
>> Hi,
>>
>> Phoenix is able to parallelize queries based on the underlying HBase
>> region splits, as well as its own internal guideposts based on statistics
>> collection [1]
>>
>> The phoenix-spark connector exposes those splits to Spark for the RDD /
>> DataFrame parallelism. In order to test this out, you can try run an
>> EXPLAIN SELECT... query [2] to mimic the DataFrame load to see how many
>> parallel scans will be run, and then compare those to the RDD / DataFrame
>> partition count (some_rdd.partitions.size). In Phoenix 4.10 and above [3],
>> they will be the same. In versions below that, the partition count will
>> equal the number of regions for that table.
>>
>> Josh
>>
>> [1] https://phoenix.apache.org/update_statistics.html
>> [2] https://phoenix.apache.org/tuning_guide.html
>> [3] https://issues.apache.org/jira/browse/PHOENIX-3600
>>
>>
>> On Thu, Aug 17, 2017 at 3:07 AM, Kanagha <er...@gmail.com> wrote:
>>
>>> Also, I'm using phoenixTableAsDataFrame API to read from a pre-split
>>> phoenix table. How can we ensure read is parallelized across all executors?
>>> Would salting/pre-splitting tables help in providing parallelism?
>>> Appreciate any inputs.
>>>
>>> Thanks
>>>
>>>
>>> Kanagha
>>>
>>> On Wed, Aug 16, 2017 at 10:16 PM, kanagha <er...@gmail.com> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> Per your previous post, it is mentioned "The phoenix-spark parallelism
>>>> is
>>>> based on the splits provided by the Phoenix query planner, and has no
>>>> requirements on specifying partition columns or upper/lower bounds."
>>>>
>>>> Does it depend upon the region splits on the input table for
>>>> parallelism?
>>>> Could you please provide more details?
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-phoenix-user-lis
>>>> t.1124778.n5.nabble.com/phoenix-spark-options-not-supporint-
>>>> query-in-dbtable-tp1915p3810.html
>>>> Sent from the Apache Phoenix User List mailing list archive at
>>>> Nabble.com.
>>>>
>>>
>>>
>>
>

Re: phoenix spark options not supporint query in dbtable

Posted by Kanagha <er...@gmail.com>.
Thanks for the details.

I tested out and saw that the no.of partitions equals to the no.of parallel
scans run upon DataFrame load in phoenix 4.10.
Also, how can we ensure that the read gets evenly distributed as tasks
across the no.of executors set for the job? I'm running phoenixTableAsDataFrame
API on a table with 4-way parallel scans and with 4 executors set for the
job. Thanks for the inputs.


Kanagha

On Thu, Aug 17, 2017 at 7:17 AM, Josh Mahonin <jm...@gmail.com> wrote:

> Hi,
>
> Phoenix is able to parallelize queries based on the underlying HBase
> region splits, as well as its own internal guideposts based on statistics
> collection [1]
>
> The phoenix-spark connector exposes those splits to Spark for the RDD /
> DataFrame parallelism. In order to test this out, you can try run an
> EXPLAIN SELECT... query [2] to mimic the DataFrame load to see how many
> parallel scans will be run, and then compare those to the RDD / DataFrame
> partition count (some_rdd.partitions.size). In Phoenix 4.10 and above [3],
> they will be the same. In versions below that, the partition count will
> equal the number of regions for that table.
>
> Josh
>
> [1] https://phoenix.apache.org/update_statistics.html
> [2] https://phoenix.apache.org/tuning_guide.html
> [3] https://issues.apache.org/jira/browse/PHOENIX-3600
>
>
> On Thu, Aug 17, 2017 at 3:07 AM, Kanagha <er...@gmail.com> wrote:
>
>> Also, I'm using phoenixTableAsDataFrame API to read from a pre-split
>> phoenix table. How can we ensure read is parallelized across all executors?
>> Would salting/pre-splitting tables help in providing parallelism?
>> Appreciate any inputs.
>>
>> Thanks
>>
>>
>> Kanagha
>>
>> On Wed, Aug 16, 2017 at 10:16 PM, kanagha <er...@gmail.com> wrote:
>>
>>> Hi Josh,
>>>
>>> Per your previous post, it is mentioned "The phoenix-spark parallelism is
>>> based on the splits provided by the Phoenix query planner, and has no
>>> requirements on specifying partition columns or upper/lower bounds."
>>>
>>> Does it depend upon the region splits on the input table for parallelism?
>>> Could you please provide more details?
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-phoenix-user-lis
>>> t.1124778.n5.nabble.com/phoenix-spark-options-not-supporint-
>>> query-in-dbtable-tp1915p3810.html
>>> Sent from the Apache Phoenix User List mailing list archive at
>>> Nabble.com.
>>>
>>
>>
>

Re: phoenix spark options not supporint query in dbtable

Posted by Josh Mahonin <jm...@gmail.com>.
Hi,

Phoenix is able to parallelize queries based on the underlying HBase region
splits, as well as its own internal guideposts based on statistics
collection [1]

The phoenix-spark connector exposes those splits to Spark for the RDD /
DataFrame parallelism. In order to test this out, you can try run an
EXPLAIN SELECT... query [2] to mimic the DataFrame load to see how many
parallel scans will be run, and then compare those to the RDD / DataFrame
partition count (some_rdd.partitions.size). In Phoenix 4.10 and above [3],
they will be the same. In versions below that, the partition count will
equal the number of regions for that table.

Josh

[1] https://phoenix.apache.org/update_statistics.html
[2] https://phoenix.apache.org/tuning_guide.html
[3] https://issues.apache.org/jira/browse/PHOENIX-3600


On Thu, Aug 17, 2017 at 3:07 AM, Kanagha <er...@gmail.com> wrote:

> Also, I'm using phoenixTableAsDataFrame API to read from a pre-split
> phoenix table. How can we ensure read is parallelized across all executors?
> Would salting/pre-splitting tables help in providing parallelism?
> Appreciate any inputs.
>
> Thanks
>
>
> Kanagha
>
> On Wed, Aug 16, 2017 at 10:16 PM, kanagha <er...@gmail.com> wrote:
>
>> Hi Josh,
>>
>> Per your previous post, it is mentioned "The phoenix-spark parallelism is
>> based on the splits provided by the Phoenix query planner, and has no
>> requirements on specifying partition columns or upper/lower bounds."
>>
>> Does it depend upon the region splits on the input table for parallelism?
>> Could you please provide more details?
>>
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context: http://apache-phoenix-user-lis
>> t.1124778.n5.nabble.com/phoenix-spark-options-not-supporint-
>> query-in-dbtable-tp1915p3810.html
>> Sent from the Apache Phoenix User List mailing list archive at Nabble.com.
>>
>
>

Re: phoenix spark options not supporint query in dbtable

Posted by Kanagha <er...@gmail.com>.
Also, I'm using phoenixTableAsDataFrame API to read from a pre-split
phoenix table. How can we ensure read is parallelized across all executors?
Would salting/pre-splitting tables help in providing parallelism?
Appreciate any inputs.

Thanks


Kanagha

On Wed, Aug 16, 2017 at 10:16 PM, kanagha <er...@gmail.com> wrote:

> Hi Josh,
>
> Per your previous post, it is mentioned "The phoenix-spark parallelism is
> based on the splits provided by the Phoenix query planner, and has no
> requirements on specifying partition columns or upper/lower bounds."
>
> Does it depend upon the region splits on the input table for parallelism?
> Could you please provide more details?
>
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-phoenix-user-
> list.1124778.n5.nabble.com/phoenix-spark-options-not-
> supporint-query-in-dbtable-tp1915p3810.html
> Sent from the Apache Phoenix User List mailing list archive at Nabble.com.
>

Re: phoenix spark options not supporint query in dbtable

Posted by kanagha <er...@gmail.com>.
Hi Josh,

Per your previous post, it is mentioned "The phoenix-spark parallelism is
based on the splits provided by the Phoenix query planner, and has no
requirements on specifying partition columns or upper/lower bounds."

Does it depend upon the region splits on the input table for parallelism?
Could you please provide more details?


Thanks 



--
View this message in context: http://apache-phoenix-user-list.1124778.n5.nabble.com/phoenix-spark-options-not-supporint-query-in-dbtable-tp1915p3810.html
Sent from the Apache Phoenix User List mailing list archive at Nabble.com.

Re: phoenix spark options not supporint query in dbtable

Posted by Josh Mahonin <jm...@gmail.com>.
They're effectively the same code paths. However, I'd recommend using the
Data Frame API unless you have a specific need to pass in a custom
Configuration object.

The Data Frame API has bindings in Scala, Java and Python, so that's
another advantage. The phoenix-spark docs have a PySpark example, but it's
applicable to Java (and Scala) as well.

Josh

On Thu, Jun 9, 2016 at 1:02 PM, Long, Xindian <Xi...@sensus.com>
wrote:

> Hi, Josh:
>
>
>
> Thanks for the answer. Do you know the underlining difference between the
> following two ways of Loading a Dataframe? (using the Data Source API, or
> Load as a DataFrame directly using a Configuration object)
>
>
>
> Is there a  Java interface to use the functionality of
> phoenixTableAsDataFrame, saveToPhoenix ?
>
>
>
> Thanks
>
>
>
> Xindian
>
>
> Load as a DataFrame using the Data Source API
>
> *import org.apache.spark.SparkContext*
>
> *import org.apache.spark.sql.SQLContext*
>
> *import org.apache.phoenix.spark._*
>
>
>
> *val sc = new SparkContext("local", "phoenix-test")*
>
> *val sqlContext = new SQLContext(sc)*
>
>
>
> *val df = sqlContext.load(*
>
> *  "org.apache.phoenix.spark",*
>
> *  Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")*
>
> *)*
>
>
>
> *df*
>
> *  .filter(df("COL1") === "test_row_1" && df("ID") === 1L)*
>
> *  .select(df("ID"))*
>
> *  .show*
> Or Load Load as a DataFrame directly using a Configuration object
>
> *import org.apache.hadoop.conf.Configuration*
>
> *import org.apache.spark.SparkContext*
>
> *import org.apache.spark.sql.SQLContext*
>
> *import org.apache.phoenix.spark._*
>
>
>
> *val configuration = new Configuration()*
>
> *// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'*
>
>
>
> *val sc = new SparkContext("local", "phoenix-test")*
>
> *val sqlContext = new SQLContext(sc)*
>
>
>
> *// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame*
>
> *val df = sqlContext.phoenixTableAsDataFrame(*
>
> *  "TABLE1", Array("ID", "COL1"), conf = configuration*
>
> *)*
>
>
>
> *df.show*
>
>
>
>
>
>
> *From:* Josh Mahonin [mailto:jmahonin@gmail.com]
> *Sent:* 2016年6月9日 9:44
> *To:* user@phoenix.apache.org
> *Subject:* Re: phoenix spark options not supporint query in dbtable
>
>
>
> Hi Xindian,
>
>
>
> The phoenix-spark integration is based on the Phoenix MapReduce layer,
> which doesn't support aggregate functions. However, as you mentioned, both
> filtering and pruning predicates are pushed down to Phoenix. With an RDD or
> DataFrame loaded, all of Spark's various aggregation methods are available
> to you.
>
>
>
> Although the Spark JDBC data source supports the full complement of
> Phoenix's supported queries, the way it achieves parallelism is to split
> the query across a number of workers and connections based on a
> 'partitionColumn' with a 'lowerBound' and 'upperBound', which must be
> numeric. If your use case has numeric primary keys, then that is
> potentially a good solution for you. [1]
>
>
>
> The phoenix-spark parallelism is based on the splits provided by the
> Phoenix query planner, and has no requirements on specifying partition
> columns or upper/lower bounds. It's up to you to evaluate which technique
> is the right method for your use case. [2]
>
>
>
> Good luck,
>
>
>
> Josh
>
> [1]
> http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
>
> [2] https://phoenix.apache.org/phoenix_spark.html
>
>
>
>
>
> On Wed, Jun 8, 2016 at 6:01 PM, Long, Xindian <Xi...@sensus.com>
> wrote:
>
> The Spark JDBC data source supports to specify a query as the  “dbtable”
> option.
>
> I assume all queries in the above query in pushed down to the database
> instead of done in Spark.
>
>
>
> The  phoenix spark plug in seems not supporting that. Why is that? Any
> plan in the future to support it?
>
>
>
> I know phoenix spark does support an optional select clause and predicate
> push down in some cases, but it is limited.
>
>
>
> Thanks
>
>
>
> Xindian
>
>
>
>
>
> -------------------------------------------
>
> Xindian “Shindian” Long
>
> Mobile:  919-9168651
>
> Email: Xindian.Long@gmail.com
>
>
>
>
>
>
>
>
>

RE: phoenix spark options not supporint query in dbtable

Posted by "Long, Xindian" <Xi...@sensus.com>.
Hi, Josh:

Thanks for the answer. Do you know the underlining difference between the following two ways of Loading a Dataframe? (using the Data Source API, or Load as a DataFrame directly using a Configuration object)

Is there a  Java interface to use the functionality of phoenixTableAsDataFrame, saveToPhoenix ?

Thanks

Xindian

Load as a DataFrame using the Data Source API
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._

val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)

val df = sqlContext.load(
  "org.apache.phoenix.spark",
  Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
)

df
  .filter(df("COL1") === "test_row_1" && df("ID") === 1L)
  .select(df("ID"))
  .show
Or Load Load as a DataFrame directly using a Configuration object
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._

val configuration = new Configuration()
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'

val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)

// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
val df = sqlContext.phoenixTableAsDataFrame(
  "TABLE1", Array("ID", "COL1"), conf = configuration
)

df.show



From: Josh Mahonin [mailto:jmahonin@gmail.com]
Sent: 2016年6月9日 9:44
To: user@phoenix.apache.org
Subject: Re: phoenix spark options not supporint query in dbtable

Hi Xindian,

The phoenix-spark integration is based on the Phoenix MapReduce layer, which doesn't support aggregate functions. However, as you mentioned, both filtering and pruning predicates are pushed down to Phoenix. With an RDD or DataFrame loaded, all of Spark's various aggregation methods are available to you.

Although the Spark JDBC data source supports the full complement of Phoenix's supported queries, the way it achieves parallelism is to split the query across a number of workers and connections based on a 'partitionColumn' with a 'lowerBound' and 'upperBound', which must be numeric. If your use case has numeric primary keys, then that is potentially a good solution for you. [1]

The phoenix-spark parallelism is based on the splits provided by the Phoenix query planner, and has no requirements on specifying partition columns or upper/lower bounds. It's up to you to evaluate which technique is the right method for your use case. [2]

Good luck,

Josh
[1] http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
[2] https://phoenix.apache.org/phoenix_spark.html


On Wed, Jun 8, 2016 at 6:01 PM, Long, Xindian <Xi...@sensus.com>> wrote:
The Spark JDBC data source supports to specify a query as the  “dbtable” option.
I assume all queries in the above query in pushed down to the database instead of done in Spark.

The  phoenix spark plug in seems not supporting that. Why is that? Any plan in the future to support it?

I know phoenix spark does support an optional select clause and predicate push down in some cases, but it is limited.

Thanks

Xindian


-------------------------------------------
Xindian “Shindian” Long
Mobile:  919-9168651<tel:919-9168651>
Email: Xindian.Long@gmail.com<ma...@gmail.com>





Re: phoenix spark options not supporint query in dbtable

Posted by Josh Mahonin <jm...@gmail.com>.
Hi Xindian,

The phoenix-spark integration is based on the Phoenix MapReduce layer,
which doesn't support aggregate functions. However, as you mentioned, both
filtering and pruning predicates are pushed down to Phoenix. With an RDD or
DataFrame loaded, all of Spark's various aggregation methods are available
to you.

Although the Spark JDBC data source supports the full complement of
Phoenix's supported queries, the way it achieves parallelism is to split
the query across a number of workers and connections based on a
'partitionColumn' with a 'lowerBound' and 'upperBound', which must be
numeric. If your use case has numeric primary keys, then that is
potentially a good solution for you. [1]

The phoenix-spark parallelism is based on the splits provided by the
Phoenix query planner, and has no requirements on specifying partition
columns or upper/lower bounds. It's up to you to evaluate which technique
is the right method for your use case. [2]

Good luck,

Josh

[1]
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
[2] https://phoenix.apache.org/phoenix_spark.html


On Wed, Jun 8, 2016 at 6:01 PM, Long, Xindian <Xi...@sensus.com>
wrote:

> The Spark JDBC data source supports to specify a query as the  “dbtable”
> option.
>
> I assume all queries in the above query in pushed down to the database
> instead of done in Spark.
>
>
>
> The  phoenix spark plug in seems not supporting that. Why is that? Any
> plan in the future to support it?
>
>
>
> I know phoenix spark does support an optional select clause and predicate
> push down in some cases, but it is limited.
>
>
>
> Thanks
>
>
>
> Xindian
>
>
>
>
>
> -------------------------------------------
>
> Xindian “Shindian” Long
>
> Mobile:  919-9168651
>
> Email: Xindian.Long@gmail.com
>
>
>
>
>
>
>