You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/16 16:32:30 UTC

[49/50] [abbrv] phoenix git commit: PHOENIX-1815 - Spark Datasource api

PHOENIX-1815 - Spark Datasource api


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3fb3bb4d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3fb3bb4d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3fb3bb4d

Branch: refs/heads/calcite
Commit: 3fb3bb4d2231972dc06326251b76cc1431da7386
Parents: e1bbb94
Author: ravimagham <ra...@apache.org>
Authored: Wed Apr 15 19:03:33 2015 -0700
Committer: ravimagham <ra...@apache.org>
Committed: Wed Apr 15 19:03:33 2015 -0700

----------------------------------------------------------------------
 phoenix-assembly/pom.xml                        |   6 +-
 .../src/build/components/all-common-jars.xml    |  11 ++
 phoenix-spark/README.md                         |  74 ++++++++--
 phoenix-spark/pom.xml                           |   6 -
 phoenix-spark/src/it/resources/setup.sql        |   1 +
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 135 ++++++++++++++++---
 .../phoenix/spark/ConfigurationUtil.scala       |  65 +++++++++
 .../phoenix/spark/DataFrameFunctions.scala      |  51 +++++++
 .../apache/phoenix/spark/DefaultSource.scala    |  41 ++++++
 .../org/apache/phoenix/spark/PhoenixRDD.scala   |  12 +-
 .../phoenix/spark/PhoenixRecordWritable.scala   |   2 +-
 .../apache/phoenix/spark/PhoenixRelation.scala  |  80 +++++++++++
 .../phoenix/spark/ProductRDDFunctions.scala     |  21 +--
 .../org/apache/phoenix/spark/package.scala      |   6 +-
 pom.xml                                         |   5 +
 15 files changed, 453 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/pom.xml b/phoenix-assembly/pom.xml
index 51f767f..b3a992e 100644
--- a/phoenix-assembly/pom.xml
+++ b/phoenix-assembly/pom.xml
@@ -142,9 +142,13 @@
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-flume</artifactId>
     </dependency>
-        <dependency>
+    <dependency>
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix-pig</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.phoenix</groupId>
+      <artifactId>phoenix-spark</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-assembly/src/build/components/all-common-jars.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/components/all-common-jars.xml b/phoenix-assembly/src/build/components/all-common-jars.xml
index ce6da59..769e28f 100644
--- a/phoenix-assembly/src/build/components/all-common-jars.xml
+++ b/phoenix-assembly/src/build/components/all-common-jars.xml
@@ -71,5 +71,16 @@
       </excludes>
       <fileMode>0644</fileMode>
     </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/../phoenix-spark/target/</directory>
+      <outputDirectory>lib</outputDirectory>
+      <includes>
+          <include>phoenix-*.jar</include>
+      </includes>
+      <excludes>
+          <exclude></exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+    </fileSet>
   </fileSets>
 </component>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/README.md
----------------------------------------------------------------------
diff --git a/phoenix-spark/README.md b/phoenix-spark/README.md
index 1c030f8..1e53c98 100644
--- a/phoenix-spark/README.md
+++ b/phoenix-spark/README.md
@@ -11,7 +11,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1');
 UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
 ```
 
-### Load as a DataFrame
+### Load as a DataFrame using the Data Source API
 ```scala
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
@@ -20,15 +20,39 @@ import org.apache.phoenix.spark._
 val sc = new SparkContext("local", "phoenix-test")
 val sqlContext = new SQLContext(sc)
 
+val df = sqlContext.load(
+  "org.apache.phoenix.spark", 
+  Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
+)
+
+df
+  .filter(df("COL1") === "test_row_1" && df("ID") === 1L)
+  .select(df("ID"))
+  .show
+```
+
+### Load as a DataFrame directly using a Configuration object
+```scala
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+val configuration = new Configuration()
+// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
+
+val sc = new SparkContext("local", "phoenix-test")
+val sqlContext = new SQLContext(sc)
+
 // Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
 val df = sqlContext.phoenixTableAsDataFrame(
-  "TABLE1", Array("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
+  "TABLE1", Array("ID", "COL1"), conf = configuration
 )
 
 df.show
 ```
 
-### Load as an RDD
+### Load as an RDD, using a Zookeeper URL
 ```scala
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
@@ -47,7 +71,10 @@ val firstId = rdd1.first()("ID").asInstanceOf[Long]
 val firstCol = rdd1.first()("COL1").asInstanceOf[String]
 ```
 
-## Saving RDDs to Phoenix
+## Saving RDDs to Phoenix 
+
+`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
+correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
 
 Given a Phoenix table with the following DDL
 
@@ -55,9 +82,6 @@ Given a Phoenix table with the following DDL
 CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 ```
 
-`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
-correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
-
 ```scala
 import org.apache.spark.SparkContext
 import org.apache.phoenix.spark._
@@ -74,6 +98,38 @@ sc
   )
 ```
 
+## Saving DataFrames to Phoenix
+
+The `save` is method on DataFrame allows passing in a data source type. You can use
+`org.apache.phoenix.spark`, and must also pass in a `table` and `zkUrl` parameter to
+specify which table and server to persist the DataFrame to. The column names are derived from
+the DataFrame's schema field names, and must match the Phoenix column names.
+
+The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported.
+
+Given two Phoenix tables with the following DDL:
+
+```sql
+CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+```
+
+```scala
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+// Load INPUT_TABLE
+val sc = new SparkContext("local", "phoenix-test")
+val sqlContext = new SQLContext(sc)
+val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE",
+  "zkUrl" -> hbaseConnectionString))
+
+// Save to OUTPUT_TABLE
+df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "OUTPUT_TABLE", 
+  "zkUrl" -> hbaseConnectionString))
+```
+
 ## Notes
 
 The functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
@@ -85,5 +141,7 @@ in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` mu
 
 ## Limitations
 
-- No pushdown predicate support from Spark SQL (yet)
+- Basic support for column and predicate pushdown using the Data Source API
+- The Data Source API does not support passing custom Phoenix settings in configuration, you must
+create the DataFrame or RDD directly if you need fine-grained configuration.
 - No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 8b06cf7..adeed88 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -97,12 +97,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.xerial.snappy</groupId>
-      <artifactId>snappy-java</artifactId>
-      <version>1.1.1.6</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop-two.version}</version>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/it/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
index ce74c58..40157a2 100644
--- a/phoenix-spark/src/it/resources/setup.sql
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -15,6 +15,7 @@
 -- limitations under the License.
 
 CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
 CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
 UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 149baec..db99f65 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -17,14 +17,14 @@ import java.sql.{Connection, DriverManager}
 import java.util.Date
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility}
+import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HBaseTestingUtility}
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
 import org.apache.phoenix.query.BaseTest
 import org.apache.phoenix.schema.ColumnNotFoundException
 import org.apache.phoenix.schema.types.PVarchar
 import org.apache.phoenix.util.ColumnInfo
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.sql.{SaveMode, execution, SQLContext}
+import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.joda.time.DateTime
 import org.scalatest._
@@ -139,7 +139,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
     df2.registerTempTable("sql_table_2")
 
-    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+    val sqlRdd = sqlContext.sql("""
+        |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
+        |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
+    )
 
     val count = sqlRdd.count()
 
@@ -149,7 +152,9 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
   test("Can create schema RDD and execute query on case sensitive table (no config)") {
     val sqlContext = new SQLContext(sc)
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(quorumAddress))
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+      zkUrl = Some(quorumAddress))
 
     df1.registerTempTable("table3")
 
@@ -163,7 +168,8 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
   test("Can create schema RDD and execute constrained query") {
     val sqlContext = new SQLContext(sc)
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"),
+      conf = hbaseConfiguration)
 
     df1.registerTempTable("sql_table_1")
 
@@ -173,7 +179,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
     df2.registerTempTable("sql_table_2")
 
-    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+    val sqlRdd = sqlContext.sql("""
+      |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
+      |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
+    )
 
     val count = sqlRdd.count()
 
@@ -194,7 +203,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
       // we have to execute an action before the predicate failure can occur
       val count = sqlRdd.count()
-    }.getCause shouldBe a [ColumnNotFoundException]
+    }.getCause shouldBe a[ColumnNotFoundException]
   }
 
   test("Can create schema RDD with predicate that will never match") {
@@ -216,10 +225,15 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
   test("Can create schema RDD with complex predicate") {
     val sqlContext = new SQLContext(sc)
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"),
-      predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"),
+    val df1 = sqlContext.phoenixTableAsDataFrame(
+      "DATE_PREDICATE_TEST_TABLE",
+      Array("ID", "TIMESERIES_KEY"),
+      predicate = Some("""
+        |ID > 0 AND TIMESERIES_KEY BETWEEN
+        |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
+        |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
       conf = hbaseConfiguration)
-    
+
     df1.registerTempTable("date_predicate_test_table")
 
     val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
@@ -248,7 +262,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
     count shouldEqual 1L
   }
-  
+
   test("Can read a table as an RDD") {
     val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
       conf = hbaseConfiguration)
@@ -271,7 +285,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
       .parallelize(dataSet)
       .saveToPhoenix(
         "OUTPUT_TEST_TABLE",
-        Seq("ID","COL1","COL2"),
+        Seq("ID", "COL1", "COL2"),
         hbaseConfiguration
       )
 
@@ -279,7 +293,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
     val stmt = conn.createStatement()
     val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
     val results = ListBuffer[(Long, String, Int)]()
-    while(rs.next()) {
+    while (rs.next()) {
       results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
     }
 
@@ -306,7 +320,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
     val stmt = conn.createStatement()
     val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
     val results = ListBuffer[java.sql.Date]()
-    while(rs.next()) {
+    while (rs.next()) {
       results.append(rs.getDate(1))
     }
 
@@ -315,12 +329,89 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
     results(1).getTime shouldEqual date.getTime
   }
 
-  test("Not specifying a zkUrl or a config quorum URL should fail") {
-    intercept[UnsupportedOperationException] {
-      val sqlContext = new SQLContext(sc)
-      val badConf = new Configuration(hbaseConfiguration)
-      badConf.unset(HConstants.ZOOKEEPER_QUORUM)
-      sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf)
+  test("Can infer schema without defining columns") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration)
+    df.schema("ID").dataType shouldEqual LongType
+    df.schema("TABLE1_ID").dataType shouldEqual LongType
+    df.schema("t2col1").dataType shouldEqual StringType
+  }
+
+  test("Spark SQL can use Phoenix as a data source with no schema specified") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+      "zkUrl" -> quorumAddress))
+    df.count() shouldEqual 2
+    df.schema("ID").dataType shouldEqual LongType
+    df.schema("COL1").dataType shouldEqual StringType
+  }
+
+  test("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+      "zkUrl" -> quorumAddress))
+    val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID"))
+
+    // Make sure we got the right value back
+    assert(res.first().getLong(0) == 1L)
+
+    /*
+      NOTE: There doesn't appear to be any way of verifying from the Spark query planner that
+      filtering is being pushed down and done server-side. However, since PhoenixRelation
+      implements PrunedFilteredScan, debugging has shown that both the SELECT columns and WHERE
+      predicates are being passed along to us, which we then forward it to Phoenix.
+      TODO: investigate further to find a way to verify server-side pushdown
+     */
+  }
+
+  test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {
+    // Load from TABLE1
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+      "zkUrl" -> quorumAddress))
+
+    // Save to TABLE1_COPY
+    df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress))
+
+    // Verify results
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
+
+    val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
+    val results = ListBuffer[(Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getString(2)))
     }
+    stmt.close()
+
+    results.toList shouldEqual checkResults
   }
-}
+
+  test("Can persist a dataframe using 'DataFrame.save()") {
+    // Clear TABLE1_COPY
+    var stmt = conn.createStatement()
+    stmt.executeUpdate("DELETE FROM TABLE1_COPY")
+    stmt.close()
+
+    // Load TABLE1, save as TABLE1_COPY
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+      "zkUrl" -> quorumAddress))
+
+    // Save to TABLE21_COPY
+    df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "TABLE1_COPY", "zkUrl" -> quorumAddress))
+
+    // Verify results
+    stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
+
+    val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
+    val results = ListBuffer[(Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getString(2)))
+    }
+    stmt.close()
+
+    results.toList shouldEqual checkResults
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
new file mode 100644
index 0000000..c0c7248
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -0,0 +1,65 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
+import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.util.ColumnInfo
+import scala.collection.JavaConversions._
+
+object ConfigurationUtil extends Serializable {
+
+  def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], conf: Option[Configuration]): Configuration = {
+
+    // Create an HBaseConfiguration object from the passed in config, if present
+    val config = conf match {
+      case Some(c) => HBaseConfiguration.create(c)
+      case _ => HBaseConfiguration.create()
+    }
+
+    // Set the table to save to
+    PhoenixConfigurationUtil.setOutputTableName(config, tableName)
+
+    // Infer column names from the DataFrame schema
+    PhoenixConfigurationUtil.setUpsertColumnNames(config, columns.mkString(","))
+
+    // Override the Zookeeper URL if present. Throw exception if no address given.
+    zkUrl match {
+      case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url )
+      case _ => {
+        if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) {
+          throw new UnsupportedOperationException(
+            s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
+          )
+        }
+      }
+    }
+
+    // Return the configuration object
+    config
+  }
+
+  // Return a serializable representation of the columns
+  def encodeColumns(conf: Configuration): String = {
+    ColumnInfoToStringEncoderDecoder.encode(
+      PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf)
+    )
+  }
+
+  // Decode the columns to a list of ColumnInfo objects
+  def decodeColumns(encodedColumns: String): List[ColumnInfo] = {
+    ColumnInfoToStringEncoderDecoder.decode(encodedColumns).toList
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
new file mode 100644
index 0000000..e17d7a5
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -0,0 +1,51 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.io.NullWritable
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat
+import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
+
+  def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
+                    zkUrl: Option[String] = None): Unit = {
+
+    val config = ConfigurationUtil.getOutputConfiguration(tableName, data.schema.fieldNames, zkUrl, Some(conf))
+
+    // Encode the column info to a serializable type
+    val encodedColumns = ConfigurationUtil.encodeColumns(config)
+
+    // Map the row object into a PhoenixRecordWritable
+    val phxRDD: RDD[(NullWritable, PhoenixRecordWritable)] = data.map { row =>
+      val rec = new PhoenixRecordWritable(encodedColumns)
+      row.toSeq.foreach { e => rec.add(e) }
+      (null, rec)
+    }
+
+    // Save it
+    phxRDD.saveAsNewAPIHadoopFile(
+      "",
+      classOf[NullWritable],
+      classOf[PhoenixRecordWritable],
+      classOf[PhoenixOutputFormat[PhoenixRecordWritable]],
+      config
+    )
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
new file mode 100644
index 0000000..b0e9754
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -0,0 +1,41 @@
+package org.apache.phoenix.spark
+
+import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
+import org.apache.spark.sql.sources.{CreatableRelationProvider, BaseRelation, RelationProvider}
+import org.apache.phoenix.spark._
+
+class DefaultSource extends RelationProvider with CreatableRelationProvider {
+
+  // Override 'RelationProvider.createRelation', this enables DataFrame.load()
+  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
+    verifyParameters(parameters)
+
+    new PhoenixRelation(
+      parameters("table"),
+      parameters("zkUrl")
+    )(sqlContext)
+  }
+
+  // Override 'CreatableRelationProvider.createRelation', this enables DataFrame.save()
+  override def createRelation(sqlContext: SQLContext, mode: SaveMode,
+                              parameters: Map[String, String], data: DataFrame): BaseRelation = {
+
+    if (!mode.equals(SaveMode.Overwrite)) {
+      throw new Exception("SaveMode other than SaveMode.OverWrite is not supported")
+    }
+
+    verifyParameters(parameters)
+
+    // Save the DataFrame to Phoenix
+    data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"))
+
+    // Return a relation of the saved data
+    createRelation(sqlContext, parameters)
+  }
+
+  // Ensure the required parameters are present
+  def verifyParameters(parameters: Map[String, String]): Unit = {
+    if (parameters.get("table").isEmpty) throw new RuntimeException("No Phoenix 'table' option defined")
+    if (parameters.get("zkUrl").isEmpty) throw new RuntimeException("No Phoenix 'zkUrl' option defined")
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index b27f9f9..9a359e3 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -14,7 +14,7 @@
 package org.apache.phoenix.spark
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.mapreduce.PhoenixInputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
@@ -65,12 +65,12 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
 
   def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
     val query = "SELECT %s FROM \"%s\"".format(
-      columns.map(f => "\"" + f + "\"").mkString(", "),
+      if (columns.isEmpty) "*" else columns.map(f => "\"" + f + "\"").mkString(", "),
       table
     )
 
     query + (predicate match {
-      case Some(p: String) => " WHERE " + p
+      case Some(p: String) if p.length > 0 => " WHERE " + p
       case _ => ""
     })
   }
@@ -79,10 +79,12 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
 
     // This is just simply not serializable, so don't try, but clone it because
     // PhoenixConfigurationUtil mutates it.
-    val config = new Configuration(conf)
+    val config = HBaseConfiguration.create(conf)
 
     PhoenixConfigurationUtil.setInputQuery(config, buildSql(table, columns, predicate))
-    PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(","))
+    if(!columns.isEmpty) {
+      PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(","))
+    }
     PhoenixConfigurationUtil.setInputTableName(config, "\"" + table + "\"")
     PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable])
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index 48a70ec..67e0bd2 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -31,7 +31,7 @@ class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable {
 
   override def write(statement: PreparedStatement): Unit = {
     // Decode the ColumnInfo list
-    val columns = ColumnInfoToStringEncoderDecoder.decode(encodedColumns).toList
+    val columns = ConfigurationUtil.decodeColumns(encodedColumns)
 
     // Make sure we at least line up in size
     if(upsertValues.length != columns.length) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
new file mode 100644
index 0000000..4177022
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -0,0 +1,80 @@
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.commons.lang.StringEscapeUtils.escapeSql
+
+case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlContext: SQLContext)
+    extends BaseRelation with PrunedFilteredScan {
+
+  /*
+    This is the buildScan() implementing Spark's PrunedFilteredScan.
+    Spark SQL queries with columns or predicates specified will be pushed down
+    to us here, and we can pass that on to Phoenix. According to the docs, this
+    is an optimization, and the filtering/pruning will be re-evaluated again,
+    but this prevents having to load the whole table into Spark first.
+  */
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    new PhoenixRDD(
+      sqlContext.sparkContext,
+      tableName,
+      requiredColumns,
+      Some(buildFilter(filters)),
+      Some(zkUrl),
+      new Configuration()
+    ).toDataFrame(sqlContext).rdd
+  }
+
+  // Required by BaseRelation, this will return the full schema for a table
+  override def schema: StructType = {
+    new PhoenixRDD(
+      sqlContext.sparkContext,
+      tableName,
+      Seq(),
+      None,
+      Some(zkUrl),
+      new Configuration()
+    ).toDataFrame(sqlContext).schema
+  }
+
+  // Attempt to create Phoenix-accepted WHERE clauses from Spark filters,
+  // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
+  private def buildFilter(filters: Array[Filter]): String = {
+    if (filters.isEmpty) {
+      return ""
+    }
+
+    val filter = new StringBuilder("")
+    var i = 0
+
+    filters.foreach(f => {
+      if (i > 0) {
+        filter.append(" AND")
+      }
+
+      f match {
+        case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}")
+        case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}")
+        case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}")
+        case LessThan(attr, value) => filter.append(s" $attr < ${compileValue(value)}")
+        case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}")
+        case IsNull(attr) => filter.append(s" $attr IS NULL")
+        case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL")
+        case _ => throw new Exception("Unsupported filter")
+      }
+
+      i = i + 1
+    })
+
+    filter.toString()
+  }
+
+  // Helper function to escape string values in SQL queries
+  private def compileValue(value: Any): Any = value match {
+    case stringValue: String => s"'${escapeSql(stringValue)}'"
+    case _ => value
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index 2926569..3d24fb9 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -27,27 +27,10 @@ class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Seria
                     conf: Configuration = new Configuration, zkUrl: Option[String] = None)
                     : Unit = {
 
-    // Setup Phoenix output configuration, make a local copy
-    val config = new Configuration(conf)
-    PhoenixConfigurationUtil.setOutputTableName(config, tableName)
-    PhoenixConfigurationUtil.setUpsertColumnNames(config, cols.mkString(","))
-
-    // Override the Zookeeper URL if present. Throw exception if no address given.
-    zkUrl match {
-      case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url )
-      case _ => {
-        if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) {
-          throw new UnsupportedOperationException(
-            s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
-          )
-        }
-      }
-    }
+    val config = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrl, Some(conf))
 
     // Encode the column info to a serializable type
-    val encodedColumns = ColumnInfoToStringEncoderDecoder.encode(
-      PhoenixConfigurationUtil.getUpsertColumnMetadataList(config)
-    )
+    val encodedColumns = ConfigurationUtil.encodeColumns(config)
 
     // Map each element of the product to a new (NullWritable, PhoenixRecordWritable)
     val phxRDD: RDD[(NullWritable, PhoenixRecordWritable)] = data.map { e =>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
index c19ec16..3fed79e 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
@@ -15,7 +15,7 @@ package org.apache.phoenix
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
 
 package object spark {
   implicit def toProductRDDFunctions[A <: Product](rdd: RDD[A]): ProductRDDFunctions[A] = {
@@ -29,4 +29,8 @@ package object spark {
   implicit def toSparkSqlContextFunctions(sqlContext: SQLContext): SparkSqlContextFunctions = {
     new SparkSqlContextFunctions(sqlContext)
   }
+
+  implicit def toDataFrameFunctions(data: DataFrame): DataFrameFunctions = {
+    new DataFrameFunctions(data)
+  }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3fb3bb4d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b81dfb5..977218d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -428,6 +428,11 @@
         <artifactId>phoenix-pig</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.phoenix</groupId>
+        <artifactId>phoenix-spark</artifactId>
+        <version>${project.version}</version>
+      </dependency>
 
       <!-- HBase dependencies -->
       <dependency>