You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/16 16:32:30 UTC
[49/50] [abbrv] phoenix git commit: PHOENIX-1815 - Spark Datasource
api
PHOENIX-1815 - Spark Datasource api
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3fb3bb4d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3fb3bb4d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3fb3bb4d
Branch: refs/heads/calcite
Commit: 3fb3bb4d2231972dc06326251b76cc1431da7386
Parents: e1bbb94
Author: ravimagham <ra...@apache.org>
Authored: Wed Apr 15 19:03:33 2015 -0700
Committer: ravimagham <ra...@apache.org>
Committed: Wed Apr 15 19:03:33 2015 -0700
----------------------------------------------------------------------
phoenix-assembly/pom.xml | 6 +-
.../src/build/components/all-common-jars.xml | 11 ++
phoenix-spark/README.md | 74 ++++++++--
phoenix-spark/pom.xml | 6 -
phoenix-spark/src/it/resources/setup.sql | 1 +
.../apache/phoenix/spark/PhoenixSparkIT.scala | 135 ++++++++++++++++---
.../phoenix/spark/ConfigurationUtil.scala | 65 +++++++++
.../phoenix/spark/DataFrameFunctions.scala | 51 +++++++
.../apache/phoenix/spark/DefaultSource.scala | 41 ++++++
.../org/apache/phoenix/spark/PhoenixRDD.scala | 12 +-
.../phoenix/spark/PhoenixRecordWritable.scala | 2 +-
.../apache/phoenix/spark/PhoenixRelation.scala | 80 +++++++++++
.../phoenix/spark/ProductRDDFunctions.scala | 21 +--
.../org/apache/phoenix/spark/package.scala | 6 +-
pom.xml | 5 +
15 files changed, 453 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml
index 51f767f..b3a992e 100644
--- a/phoenix-assembly/pom.xml
+++ b/phoenix-assembly/pom.xml
@@ -142,9 +142,13 @@
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-flume</artifactId>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-pig</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-spark</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-assembly/src/build/components/all-common-jars.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml
index ce6da59..769e28f 100644
--- a/phoenix-assembly/src/build/components/all-common-jars.xml
+++ b/phoenix-assembly/src/build/components/all-common-jars.xml
@@ -71,5 +71,16 @@
</excludes>
<fileMode>0644</fileMode>
</fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../phoenix-spark/target/</directory>
+ <outputDirectory>lib</outputDirectory>
+ <includes>
+ <include>phoenix-*.jar</include>
+ </includes>
+ <excludes>
+ <exclude></exclude>
+ </excludes>
+ <fileMode>0644</fileMode>
+ </fileSet>
</fileSets>
</component>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/README.md
----------------------------------------------------------------------
diff --git a/phoenix-spark/README.md b/phoenix-spark/README.md
index 1c030f8..1e53c98 100644
--- a/phoenix-spark/README.md
+++ b/phoenix-spark/README.md
@@ -11,7 +11,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1');
UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
```
-### Load as a DataFrame
+### Load as a DataFrame using the Data Source API
```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
@@ -20,15 +20,39 @@ import org.apache.phoenix.spark._
val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)
+val df = sqlContext.load(
+ "org.apache.phoenix.spark",
+ Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
+)
+
+df
+ .filter(df("COL1") === "test_row_1" && df("ID") === 1L)
+ .select(df("ID"))
+ .show
+```
+
+### Load as a DataFrame directly using a Configuration object
+```scala
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+val configuration = new Configuration()
+// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
+
+val sc = new SparkContext("local", "phoenix-test")
+val sqlContext = new SQLContext(sc)
+
// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
val df = sqlContext.phoenixTableAsDataFrame(
- "TABLE1", Array("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
+ "TABLE1", Array("ID", "COL1"), conf = configuration
)
df.show
```
-### Load as an RDD
+### Load as an RDD, using a Zookeeper URL
```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
@@ -47,7 +71,10 @@ val firstId = rdd1.first()("ID").asInstanceOf[Long]
val firstCol = rdd1.first()("COL1").asInstanceOf[String]
```
-## Saving RDDs to Phoenix
+## Saving RDDs to Phoenix
+
+`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
+correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
Given a Phoenix table with the following DDL
@@ -55,9 +82,6 @@ Given a Phoenix table with the following DDL
CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
-`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
-correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
-
```scala
import org.apache.spark.SparkContext
import org.apache.phoenix.spark._
@@ -74,6 +98,38 @@ sc
)
```
+## Saving DataFrames to Phoenix
+
+The `save` is method on DataFrame allows passing in a data source type. You can use
+`org.apache.phoenix.spark`, and must also pass in a `table` and `zkUrl` parameter to
+specify which table and server to persist the DataFrame to. The column names are derived from
+the DataFrame's schema field names, and must match the Phoenix column names.
+
+The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported.
+
+Given two Phoenix tables with the following DDL:
+
+```sql
+CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+```
+
+```scala
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+// Load INPUT_TABLE
+val sc = new SparkContext("local", "phoenix-test")
+val sqlContext = new SQLContext(sc)
+val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE",
+ "zkUrl" -> hbaseConnectionString))
+
+// Save to OUTPUT_TABLE
+df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "OUTPUT_TABLE",
+ "zkUrl" -> hbaseConnectionString))
+```
+
## Notes
The functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
@@ -85,5 +141,7 @@ in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` mu
## Limitations
-- No pushdown predicate support from Spark SQL (yet)
+- Basic support for column and predicate pushdown using the Data Source API
+- The Data Source API does not support passing custom Phoenix settings in configuration, you must
+create the DataFrame or RDD directly if you need fine-grained configuration.
- No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 8b06cf7..adeed88 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -97,12 +97,6 @@
</dependency>
<dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- <version>1.1.1.6</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-two.version}</version>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/it/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
index ce74c58..40157a2 100644
--- a/phoenix-spark/src/it/resources/setup.sql
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -15,6 +15,7 @@
-- limitations under the License.
CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 149baec..db99f65 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -17,14 +17,14 @@ import java.sql.{Connection, DriverManager}
import java.util.Date
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility}
+import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HBaseTestingUtility}
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
import org.apache.phoenix.query.BaseTest
import org.apache.phoenix.schema.ColumnNotFoundException
import org.apache.phoenix.schema.types.PVarchar
import org.apache.phoenix.util.ColumnInfo
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.{SaveMode, execution, SQLContext}
+import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField}
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.DateTime
import org.scalatest._
@@ -139,7 +139,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
df2.registerTempTable("sql_table_2")
- val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+ val sqlRdd = sqlContext.sql("""
+ |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
+ |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
+ )
val count = sqlRdd.count()
@@ -149,7 +152,9 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
test("Can create schema RDD and execute query on case sensitive table (no config)") {
val sqlContext = new SQLContext(sc)
- val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(quorumAddress))
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+ zkUrl = Some(quorumAddress))
df1.registerTempTable("table3")
@@ -163,7 +168,8 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
test("Can create schema RDD and execute constrained query") {
val sqlContext = new SQLContext(sc)
- val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+ val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"),
+ conf = hbaseConfiguration)
df1.registerTempTable("sql_table_1")
@@ -173,7 +179,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
df2.registerTempTable("sql_table_2")
- val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+ val sqlRdd = sqlContext.sql("""
+ |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
+ |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
+ )
val count = sqlRdd.count()
@@ -194,7 +203,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
// we have to execute an action before the predicate failure can occur
val count = sqlRdd.count()
- }.getCause shouldBe a [ColumnNotFoundException]
+ }.getCause shouldBe a[ColumnNotFoundException]
}
test("Can create schema RDD with predicate that will never match") {
@@ -216,10 +225,15 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
test("Can create schema RDD with complex predicate") {
val sqlContext = new SQLContext(sc)
- val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"),
- predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"),
+ val df1 = sqlContext.phoenixTableAsDataFrame(
+ "DATE_PREDICATE_TEST_TABLE",
+ Array("ID", "TIMESERIES_KEY"),
+ predicate = Some("""
+ |ID > 0 AND TIMESERIES_KEY BETWEEN
+ |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
+ |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
conf = hbaseConfiguration)
-
+
df1.registerTempTable("date_predicate_test_table")
val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
@@ -248,7 +262,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
count shouldEqual 1L
}
-
+
test("Can read a table as an RDD") {
val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
conf = hbaseConfiguration)
@@ -271,7 +285,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
.parallelize(dataSet)
.saveToPhoenix(
"OUTPUT_TEST_TABLE",
- Seq("ID","COL1","COL2"),
+ Seq("ID", "COL1", "COL2"),
hbaseConfiguration
)
@@ -279,7 +293,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
val results = ListBuffer[(Long, String, Int)]()
- while(rs.next()) {
+ while (rs.next()) {
results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
}
@@ -306,7 +320,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
val results = ListBuffer[java.sql.Date]()
- while(rs.next()) {
+ while (rs.next()) {
results.append(rs.getDate(1))
}
@@ -315,12 +329,89 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
results(1).getTime shouldEqual date.getTime
}
- test("Not specifying a zkUrl or a config quorum URL should fail") {
- intercept[UnsupportedOperationException] {
- val sqlContext = new SQLContext(sc)
- val badConf = new Configuration(hbaseConfiguration)
- badConf.unset(HConstants.ZOOKEEPER_QUORUM)
- sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf)
+ test("Can infer schema without defining columns") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration)
+ df.schema("ID").dataType shouldEqual LongType
+ df.schema("TABLE1_ID").dataType shouldEqual LongType
+ df.schema("t2col1").dataType shouldEqual StringType
+ }
+
+ test("Spark SQL can use Phoenix as a data source with no schema specified") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+ "zkUrl" -> quorumAddress))
+ df.count() shouldEqual 2
+ df.schema("ID").dataType shouldEqual LongType
+ df.schema("COL1").dataType shouldEqual StringType
+ }
+
+ test("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+ "zkUrl" -> quorumAddress))
+ val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID"))
+
+ // Make sure we got the right value back
+ assert(res.first().getLong(0) == 1L)
+
+ /*
+ NOTE: There doesn't appear to be any way of verifying from the Spark query planner that
+ filtering is being pushed down and done server-side. However, since PhoenixRelation
+ implements PrunedFilteredScan, debugging has shown that both the SELECT columns and WHERE
+ predicates are being passed along to us, which we then forward it to Phoenix.
+ TODO: investigate further to find a way to verify server-side pushdown
+ */
+ }
+
+ test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {
+ // Load from TABLE1
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+ "zkUrl" -> quorumAddress))
+
+ // Save to TABLE1_COPY
+ df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress))
+
+ // Verify results
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
+
+ val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
+ val results = ListBuffer[(Long, String)]()
+ while (rs.next()) {
+ results.append((rs.getLong(1), rs.getString(2)))
}
+ stmt.close()
+
+ results.toList shouldEqual checkResults
}
-}
+
+ test("Can persist a dataframe using 'DataFrame.save()") {
+ // Clear TABLE1_COPY
+ var stmt = conn.createStatement()
+ stmt.executeUpdate("DELETE FROM TABLE1_COPY")
+ stmt.close()
+
+ // Load TABLE1, save as TABLE1_COPY
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+ "zkUrl" -> quorumAddress))
+
+ // Save to TABLE21_COPY
+ df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "TABLE1_COPY", "zkUrl" -> quorumAddress))
+
+ // Verify results
+ stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
+
+ val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
+ val results = ListBuffer[(Long, String)]()
+ while (rs.next()) {
+ results.append((rs.getLong(1), rs.getString(2)))
+ }
+ stmt.close()
+
+ results.toList shouldEqual checkResults
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
new file mode 100644
index 0000000..c0c7248
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -0,0 +1,65 @@
+/*
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
+import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.util.ColumnInfo
+import scala.collection.JavaConversions._
+
+object ConfigurationUtil extends Serializable {
+
+ def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], conf: Option[Configuration]): Configuration = {
+
+ // Create an HBaseConfiguration object from the passed in config, if present
+ val config = conf match {
+ case Some(c) => HBaseConfiguration.create(c)
+ case _ => HBaseConfiguration.create()
+ }
+
+ // Set the table to save to
+ PhoenixConfigurationUtil.setOutputTableName(config, tableName)
+
+ // Infer column names from the DataFrame schema
+ PhoenixConfigurationUtil.setUpsertColumnNames(config, columns.mkString(","))
+
+ // Override the Zookeeper URL if present. Throw exception if no address given.
+ zkUrl match {
+ case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url )
+ case _ => {
+ if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) {
+ throw new UnsupportedOperationException(
+ s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
+ )
+ }
+ }
+ }
+
+ // Return the configuration object
+ config
+ }
+
+ // Return a serializable representation of the columns
+ def encodeColumns(conf: Configuration): String = {
+ ColumnInfoToStringEncoderDecoder.encode(
+ PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf)
+ )
+ }
+
+ // Decode the columns to a list of ColumnInfo objects
+ def decodeColumns(encodedColumns: String): List[ColumnInfo] = {
+ ColumnInfoToStringEncoderDecoder.decode(encodedColumns).toList
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
new file mode 100644
index 0000000..e17d7a5
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -0,0 +1,51 @@
+/*
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.io.NullWritable
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat
+import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
+
+ def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
+ zkUrl: Option[String] = None): Unit = {
+
+ val config = ConfigurationUtil.getOutputConfiguration(tableName, data.schema.fieldNames, zkUrl, Some(conf))
+
+ // Encode the column info to a serializable type
+ val encodedColumns = ConfigurationUtil.encodeColumns(config)
+
+ // Map the row object into a PhoenixRecordWritable
+ val phxRDD: RDD[(NullWritable, PhoenixRecordWritable)] = data.map { row =>
+ val rec = new PhoenixRecordWritable(encodedColumns)
+ row.toSeq.foreach { e => rec.add(e) }
+ (null, rec)
+ }
+
+ // Save it
+ phxRDD.saveAsNewAPIHadoopFile(
+ "",
+ classOf[NullWritable],
+ classOf[PhoenixRecordWritable],
+ classOf[PhoenixOutputFormat[PhoenixRecordWritable]],
+ config
+ )
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
new file mode 100644
index 0000000..b0e9754
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -0,0 +1,41 @@
+package org.apache.phoenix.spark
+
+import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
+import org.apache.spark.sql.sources.{CreatableRelationProvider, BaseRelation, RelationProvider}
+import org.apache.phoenix.spark._
+
+class DefaultSource extends RelationProvider with CreatableRelationProvider {
+
+ // Override 'RelationProvider.createRelation', this enables DataFrame.load()
+ override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
+ verifyParameters(parameters)
+
+ new PhoenixRelation(
+ parameters("table"),
+ parameters("zkUrl")
+ )(sqlContext)
+ }
+
+ // Override 'CreatableRelationProvider.createRelation', this enables DataFrame.save()
+ override def createRelation(sqlContext: SQLContext, mode: SaveMode,
+ parameters: Map[String, String], data: DataFrame): BaseRelation = {
+
+ if (!mode.equals(SaveMode.Overwrite)) {
+ throw new Exception("SaveMode other than SaveMode.OverWrite is not supported")
+ }
+
+ verifyParameters(parameters)
+
+ // Save the DataFrame to Phoenix
+ data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"))
+
+ // Return a relation of the saved data
+ createRelation(sqlContext, parameters)
+ }
+
+ // Ensure the required parameters are present
+ def verifyParameters(parameters: Map[String, String]): Unit = {
+ if (parameters.get("table").isEmpty) throw new RuntimeException("No Phoenix 'table' option defined")
+ if (parameters.get("zkUrl").isEmpty) throw new RuntimeException("No Phoenix 'zkUrl' option defined")
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index b27f9f9..9a359e3 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -14,7 +14,7 @@
package org.apache.phoenix.spark
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.io.NullWritable
import org.apache.phoenix.mapreduce.PhoenixInputFormat
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
@@ -65,12 +65,12 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
val query = "SELECT %s FROM \"%s\"".format(
- columns.map(f => "\"" + f + "\"").mkString(", "),
+ if (columns.isEmpty) "*" else columns.map(f => "\"" + f + "\"").mkString(", "),
table
)
query + (predicate match {
- case Some(p: String) => " WHERE " + p
+ case Some(p: String) if p.length > 0 => " WHERE " + p
case _ => ""
})
}
@@ -79,10 +79,12 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
// This is just simply not serializable, so don't try, but clone it because
// PhoenixConfigurationUtil mutates it.
- val config = new Configuration(conf)
+ val config = HBaseConfiguration.create(conf)
PhoenixConfigurationUtil.setInputQuery(config, buildSql(table, columns, predicate))
- PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(","))
+ if(!columns.isEmpty) {
+ PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(","))
+ }
PhoenixConfigurationUtil.setInputTableName(config, "\"" + table + "\"")
PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable])
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index 48a70ec..67e0bd2 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -31,7 +31,7 @@ class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable {
override def write(statement: PreparedStatement): Unit = {
// Decode the ColumnInfo list
- val columns = ColumnInfoToStringEncoderDecoder.decode(encodedColumns).toList
+ val columns = ConfigurationUtil.decodeColumns(encodedColumns)
// Make sure we at least line up in size
if(upsertValues.length != columns.length) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
new file mode 100644
index 0000000..4177022
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -0,0 +1,80 @@
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.commons.lang.StringEscapeUtils.escapeSql
+
+case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlContext: SQLContext)
+ extends BaseRelation with PrunedFilteredScan {
+
+ /*
+ This is the buildScan() implementing Spark's PrunedFilteredScan.
+ Spark SQL queries with columns or predicates specified will be pushed down
+ to us here, and we can pass that on to Phoenix. According to the docs, this
+ is an optimization, and the filtering/pruning will be re-evaluated again,
+ but this prevents having to load the whole table into Spark first.
+ */
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ new PhoenixRDD(
+ sqlContext.sparkContext,
+ tableName,
+ requiredColumns,
+ Some(buildFilter(filters)),
+ Some(zkUrl),
+ new Configuration()
+ ).toDataFrame(sqlContext).rdd
+ }
+
+ // Required by BaseRelation, this will return the full schema for a table
+ override def schema: StructType = {
+ new PhoenixRDD(
+ sqlContext.sparkContext,
+ tableName,
+ Seq(),
+ None,
+ Some(zkUrl),
+ new Configuration()
+ ).toDataFrame(sqlContext).schema
+ }
+
+ // Attempt to create Phoenix-accepted WHERE clauses from Spark filters,
+ // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
+ private def buildFilter(filters: Array[Filter]): String = {
+ if (filters.isEmpty) {
+ return ""
+ }
+
+ val filter = new StringBuilder("")
+ var i = 0
+
+ filters.foreach(f => {
+ if (i > 0) {
+ filter.append(" AND")
+ }
+
+ f match {
+ case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}")
+ case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}")
+ case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}")
+ case LessThan(attr, value) => filter.append(s" $attr < ${compileValue(value)}")
+ case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}")
+ case IsNull(attr) => filter.append(s" $attr IS NULL")
+ case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL")
+ case _ => throw new Exception("Unsupported filter")
+ }
+
+ i = i + 1
+ })
+
+ filter.toString()
+ }
+
+ // Helper function to escape string values in SQL queries
+ private def compileValue(value: Any): Any = value match {
+ case stringValue: String => s"'${escapeSql(stringValue)}'"
+ case _ => value
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index 2926569..3d24fb9 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -27,27 +27,10 @@ class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Seria
conf: Configuration = new Configuration, zkUrl: Option[String] = None)
: Unit = {
- // Setup Phoenix output configuration, make a local copy
- val config = new Configuration(conf)
- PhoenixConfigurationUtil.setOutputTableName(config, tableName)
- PhoenixConfigurationUtil.setUpsertColumnNames(config, cols.mkString(","))
-
- // Override the Zookeeper URL if present. Throw exception if no address given.
- zkUrl match {
- case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url )
- case _ => {
- if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) {
- throw new UnsupportedOperationException(
- s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
- )
- }
- }
- }
+ val config = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrl, Some(conf))
// Encode the column info to a serializable type
- val encodedColumns = ColumnInfoToStringEncoderDecoder.encode(
- PhoenixConfigurationUtil.getUpsertColumnMetadataList(config)
- )
+ val encodedColumns = ConfigurationUtil.encodeColumns(config)
// Map each element of the product to a new (NullWritable, PhoenixRecordWritable)
val phxRDD: RDD[(NullWritable, PhoenixRecordWritable)] = data.map { e =>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
index c19ec16..3fed79e 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
@@ -15,7 +15,7 @@ package org.apache.phoenix
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
package object spark {
implicit def toProductRDDFunctions[A <: Product](rdd: RDD[A]): ProductRDDFunctions[A] = {
@@ -29,4 +29,8 @@ package object spark {
implicit def toSparkSqlContextFunctions(sqlContext: SQLContext): SparkSqlContextFunctions = {
new SparkSqlContextFunctions(sqlContext)
}
+
+ implicit def toDataFrameFunctions(data: DataFrame): DataFrameFunctions = {
+ new DataFrameFunctions(data)
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b81dfb5..977218d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -428,6 +428,11 @@
<artifactId>phoenix-pig</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-spark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- HBase dependencies -->
<dependency>