You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@phoenix.apache.org by Alex Kamil <al...@gmail.com> on 2014/11/21 23:26:20 UTC

Re: Spark SQL with Apache Phoenix lower and upper Bound

Ali,

just create a BIGINT column with numeric values in phoenix and use sequences
<http://phoenix.apache.org/sequences.html> to populate it automatically

I included the setup below in case someone starts from scratch

Prerequisites:
- export JAVA_HOME, SCALA_HOME and install sbt
- install hbase in standalone mode
<http://hbase.apache.org/book/quickstart.html>
- add phoenix jar <http://phoenix.apache.org/download.html> to hbase lib
directory
- start hbase and create a table in phoenix
<http://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html> to verify
everything is working
- install spark in standalone mode, and verify that it works using spark
shell <http://spark.apache.org/docs/latest/quick-start.html>

1. create a sequence <http://phoenix.apache.org/sequences.html> in phoenix:
$PHOENIX_HOME/hadoop1/bin/sqlline.py localhost

 > CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence;

2.add a BIGINT column called e.g. "id" to your table in phoenix

> CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR);

3. add some values
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'foo');
>UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
my_schema.my_sequence, 'bar');

4. create jdbc adapter (following SimpleApp setup in
Spark->GettingStarted->StandAlone
applications
<https://spark.apache.org/docs/latest/quick-start.html#Standalone_Applications>
):

//SparkToJDBC.scala

import java.sql.DriverManager
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import java.util.Date;

import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD

object SparkToJDBC {

  def main(args: Array[String]) {
    val sc = new SparkContext("local", "phoenix")
    try{
            val rdd = new JdbcRDD(sc,() => {

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance()

DriverManager.getConnection("jdbc:phoenix:localhost", "", "")
                   },
                   "SELECT id, name  FROM test.orders WHERE id >= ? AND id
<= ?",
                    1, 100, 3,
                    (r:ResultSet) => {
                                processResultSet(r)
                    }
                ).cache()

            println("#####################");
            println(rdd.count());
            println("#####################");
     } catch {
              case _: Throwable => println("Could not connect to database")
     }
     sc.stop()
  }

def processResultSet(rs: ResultSet){

          val rsmd = rs.getMetaData()
          val numberOfColumns = rsmd.getColumnCount()

          var i = 1
          while (i <= numberOfColumns) {
            val colName = rsmd.getColumnName(i)
            val tableName = rsmd.getTableName(i)
            val name = rsmd.getColumnTypeName(i)
            val caseSen = rsmd.isCaseSensitive(i)
            val writable = rsmd.isWritable(i)
            println("Information for column " + colName)
            println("    Column is in table " + tableName)
            println("    column type is " + name)
            println("")
            i += 1
          }

          while (rs.next()) {
            var i = 1
            while (i <= numberOfColumns) {
              val s = rs.getString(i)
              System.out.print(s + "  ")
              i += 1
            }
            println("")
          }
   }

}

5. build SparkToJDBC.scala
sbt package

6. execute spark job:
note: don't forget to add phoenix jar using --jars option like this:

../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/*
*phoenix-3.1.0-client-hadoop2.**jar *--class "SparkToJDBC" --master
local[4] target/scala-2.10/simple-project_2.10-1.0.jar

regards
Alex


On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin <jm...@interset.com> wrote:

> Hi Alaa Ali,
>
> In order for Spark to split the JDBC query in parallel, it expects an
> upper and lower bound for your input data, as well as a number of
> partitions so that it can split the query across multiple tasks.
>
> For example, depending on your data distribution, you could set an upper
> and lower bound on your timestamp range, and spark should be able to create
> new sub-queries to split up the data.
>
> Another option is to load up the whole table using the PhoenixInputFormat
> as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
> functions, but it does let you load up whole tables as RDDs.
>
> I've previously posted example code here:
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=Q@mail.gmail.com%3E
>
> There's also an example library implementation here, although I haven't
> had a chance to test it yet:
> https://github.com/simplymeasured/phoenix-spark
>
> Josh
>
> On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali <co...@gmail.com> wrote:
>
>> I want to run queries on Apache Phoenix which has a JDBC driver. The
>> query that I want to run is:
>>
>>     select ts,ename from random_data_date limit 10
>>
>> But I'm having issues with the JdbcRDD upper and lowerBound parameters
>> (that I don't actually understand).
>>
>> Here's what I have so far:
>>
>> import org.apache.spark.rdd.JdbcRDD
>> import java.sql.{Connection, DriverManager, ResultSet}
>>
>> val url="jdbc:phoenix:zookeeper"
>> val sql = "select ts,ename from random_data_date limit ?"
>> val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql,
>> 5, 10, 2, r => r.getString("ts") + ", " + r.getString("ename"))
>>
>> But this doesn't work because the sql expression that the JdbcRDD expects
>> has to have two ?s to represent the lower and upper bound.
>>
>> How can I run my query through the JdbcRDD?
>>
>> Regards,
>> Alaa Ali
>>
>
>

Re: Spark SQL with Apache Phoenix lower and upper Bound

Posted by Alaa Ali <co...@gmail.com>.
Thanks Alex! I'm actually working with views from HBase because I will
never edit the HBase table from Phoenix and I'd hate to accidentally drop
it. I'll have to work out how to create the view with the additional ID
column.

Regards,
Alaa Ali

On Fri, Nov 21, 2014 at 5:26 PM, Alex Kamil <al...@gmail.com> wrote:

> Ali,
>
> just create a BIGINT column with numeric values in phoenix and use
> sequences <http://phoenix.apache.org/sequences.html> to populate it
> automatically
>
> I included the setup below in case someone starts from scratch
>
> Prerequisites:
> - export JAVA_HOME, SCALA_HOME and install sbt
> - install hbase in standalone mode
> <http://hbase.apache.org/book/quickstart.html>
> - add phoenix jar <http://phoenix.apache.org/download.html> to hbase lib
> directory
> - start hbase and create a table in phoenix
> <http://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html> to verify
> everything is working
> - install spark in standalone mode, and verify that it works using spark
> shell <http://spark.apache.org/docs/latest/quick-start.html>
>
> 1. create a sequence <http://phoenix.apache.org/sequences.html> in
> phoenix:
> $PHOENIX_HOME/hadoop1/bin/sqlline.py localhost
>
>  > CREATE SEQUENCE IF NOT EXISTS my_schema.my_sequence;
>
> 2.add a BIGINT column called e.g. "id" to your table in phoenix
>
> > CREATE TABLE test.orders ( id BIGINT not null primary key, name VARCHAR);
>
> 3. add some values
> >UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
> my_schema.my_sequence, 'foo');
> >UPSERT INTO test.orders (id, name) VALUES( NEXT VALUE FOR
> my_schema.my_sequence, 'bar');
>
> 4. create jdbc adapter (following SimpleApp setup in Spark->GettingStarted->StandAlone
> applications
> <https://spark.apache.org/docs/latest/quick-start.html#Standalone_Applications>
> ):
>
> //SparkToJDBC.scala
>
> import java.sql.DriverManager
> import java.sql.Connection;
> import java.sql.DriverManager;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.sql.SQLException;
> import java.sql.Statement;
>
> import java.util.Date;
>
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.JdbcRDD
>
> object SparkToJDBC {
>
>   def main(args: Array[String]) {
>     val sc = new SparkContext("local", "phoenix")
>     try{
>             val rdd = new JdbcRDD(sc,() => {
>
> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance()
>
> DriverManager.getConnection("jdbc:phoenix:localhost", "", "")
>                    },
>                    "SELECT id, name  FROM test.orders WHERE id >= ? AND id
> <= ?",
>                     1, 100, 3,
>                     (r:ResultSet) => {
>                                 processResultSet(r)
>                     }
>                 ).cache()
>
>             println("#####################");
>             println(rdd.count());
>             println("#####################");
>      } catch {
>               case _: Throwable => println("Could not connect to database")
>      }
>      sc.stop()
>   }
>
> def processResultSet(rs: ResultSet){
>
>           val rsmd = rs.getMetaData()
>           val numberOfColumns = rsmd.getColumnCount()
>
>           var i = 1
>           while (i <= numberOfColumns) {
>             val colName = rsmd.getColumnName(i)
>             val tableName = rsmd.getTableName(i)
>             val name = rsmd.getColumnTypeName(i)
>             val caseSen = rsmd.isCaseSensitive(i)
>             val writable = rsmd.isWritable(i)
>             println("Information for column " + colName)
>             println("    Column is in table " + tableName)
>             println("    column type is " + name)
>             println("")
>             i += 1
>           }
>
>           while (rs.next()) {
>             var i = 1
>             while (i <= numberOfColumns) {
>               val s = rs.getString(i)
>               System.out.print(s + "  ")
>               i += 1
>             }
>             println("")
>           }
>    }
>
> }
>
> 5. build SparkToJDBC.scala
> sbt package
>
> 6. execute spark job:
> note: don't forget to add phoenix jar using --jars option like this:
>
> ../spark-1.1.0/bin/spark-submit *--jars ../phoenix-3.1.0-bin/hadoop2/*
> *phoenix-3.1.0-client-hadoop2.**jar *--class "SparkToJDBC" --master
> local[4] target/scala-2.10/simple-project_2.10-1.0.jar
>
> regards
> Alex
>
>
> On Fri, Nov 21, 2014 at 4:34 PM, Josh Mahonin <jm...@interset.com>
> wrote:
>
>> Hi Alaa Ali,
>>
>> In order for Spark to split the JDBC query in parallel, it expects an
>> upper and lower bound for your input data, as well as a number of
>> partitions so that it can split the query across multiple tasks.
>>
>> For example, depending on your data distribution, you could set an upper
>> and lower bound on your timestamp range, and spark should be able to create
>> new sub-queries to split up the data.
>>
>> Another option is to load up the whole table using the PhoenixInputFormat
>> as a NewHadoopRDD. It doesn't yet support many of Phoenix's aggregate
>> functions, but it does let you load up whole tables as RDDs.
>>
>> I've previously posted example code here:
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAJ6CGtA1DoTdadRtT5M0+75rXTyQgu5gexT+uLccw_8Ppzyt=Q@mail.gmail.com%3E
>>
>> There's also an example library implementation here, although I haven't
>> had a chance to test it yet:
>> https://github.com/simplymeasured/phoenix-spark
>>
>> Josh
>>
>> On Fri, Nov 21, 2014 at 4:14 PM, Alaa Ali <co...@gmail.com> wrote:
>>
>>> I want to run queries on Apache Phoenix which has a JDBC driver. The
>>> query that I want to run is:
>>>
>>>     select ts,ename from random_data_date limit 10
>>>
>>> But I'm having issues with the JdbcRDD upper and lowerBound parameters
>>> (that I don't actually understand).
>>>
>>> Here's what I have so far:
>>>
>>> import org.apache.spark.rdd.JdbcRDD
>>> import java.sql.{Connection, DriverManager, ResultSet}
>>>
>>> val url="jdbc:phoenix:zookeeper"
>>> val sql = "select ts,ename from random_data_date limit ?"
>>> val myRDD = new JdbcRDD(sc, () => DriverManager.getConnection(url), sql,
>>> 5, 10, 2, r => r.getString("ts") + ", " + r.getString("ename"))
>>>
>>> But this doesn't work because the sql expression that the JdbcRDD
>>> expects has to have two ?s to represent the lower and upper bound.
>>>
>>> How can I run my query through the JdbcRDD?
>>>
>>> Regards,
>>> Alaa Ali
>>>
>>
>>
>