You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/05 09:47:20 UTC

phoenix git commit: PHOENIX-1071 Add phoenix-spark for Spark integration

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 de6f73339 -> 976e37c0f


PHOENIX-1071 Add phoenix-spark for Spark integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/976e37c0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/976e37c0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/976e37c0

Branch: refs/heads/4.x-HBase-0.98
Commit: 976e37c0f7df59adb404c3d884a09b0901c08775
Parents: de6f733
Author: ravimagham <ra...@apache.org>
Authored: Sun Apr 5 00:28:41 2015 -0700
Committer: ravimagham <ra...@apache.org>
Committed: Sun Apr 5 00:28:41 2015 -0700

----------------------------------------------------------------------
 NOTICE                                          |  34 +-
 phoenix-spark/README.md                         |  89 ++++
 phoenix-spark/pom.xml                           | 523 +++++++++++++++++++
 .../org/apache/phoenix/spark/PhoenixRDD.scala   | 166 ++++++
 .../phoenix/spark/PhoenixRecordWritable.scala   |  89 ++++
 .../phoenix/spark/ProductRDDFunctions.scala     |  68 +++
 .../phoenix/spark/SparkContextFunctions.scala   |  41 ++
 .../spark/SparkSqlContextFunctions.scala        |  39 ++
 .../org/apache/phoenix/spark/package.scala      |  32 ++
 phoenix-spark/src/test/resources/log4j.xml      |  41 ++
 phoenix-spark/src/test/resources/setup.sql      |  18 +
 .../apache/phoenix/spark/PhoenixRDDTest.scala   | 333 ++++++++++++
 pom.xml                                         |   1 +
 13 files changed, 1470 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 093ae02..a8635b0 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,31 @@
-Apache Phoenix
-Copyright 2014 The Apache Software Foundation
+This product includes software developed by The Apache Software
+Foundation (http://www.apache.org/).
+
+It includes software from other Apache Software Foundation projects,
+including, but not limited to:
+  - Apache HBase
+  - Apache Hadoop
+  - Apache Commons
+
+In addition, this product includes/uses software developed by:
+
+JLine (http://jline.sourceforge.net/),
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>.
+
+SQLLine ((http://jline.sourceforge.net/),
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>.
+
+SLF4J (http://www.slf4j.org/),
+Copyright (c) 2004-2008 QOS.ch
+
+ANTLR (http://www.antlr.org/),
+Copyright (c) 2003-2008, Terrence Parr.
+
+JUnit (http://www.junit.org/) included under the Common Public License v1.0.
+See the full text here: http://junit.sourceforge.net/cpl-v10.html
+
+The phoenix-spark module has been adapted from the phoenix-spark library
+distributed under the terms of the Apache 2 license. Original source copyright:
+Copyright 2014 Simply Measured, Inc.
+Copyright 2015 Interset Software Inc.
 
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/README.md
----------------------------------------------------------------------
diff --git a/phoenix-spark/README.md b/phoenix-spark/README.md
new file mode 100644
index 0000000..1c030f8
--- /dev/null
+++ b/phoenix-spark/README.md
@@ -0,0 +1,89 @@
+phoenix-spark extends Phoenix's MapReduce support to allow Spark to load Phoenix tables as RDDs or
+DataFrames, and enables persisting RDDs of Tuples back to Phoenix.
+
+## Reading Phoenix Tables
+
+Given a Phoenix table with the following DDL
+
+```sql
+CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
+UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1');
+UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
+```
+
+### Load as a DataFrame
+```scala
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+val sc = new SparkContext("local", "phoenix-test")
+val sqlContext = new SQLContext(sc)
+
+// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
+val df = sqlContext.phoenixTableAsDataFrame(
+  "TABLE1", Array("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
+)
+
+df.show
+```
+
+### Load as an RDD
+```scala
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+val sc = new SparkContext("local", "phoenix-test")
+
+// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
+val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
+  "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
+)
+
+rdd.count()
+
+val firstId = rdd1.first()("ID").asInstanceOf[Long]
+val firstCol = rdd1.first()("COL1").asInstanceOf[String]
+```
+
+## Saving RDDs to Phoenix
+
+Given a Phoenix table with the following DDL
+
+```sql
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+```
+
+`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
+correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
+
+```scala
+import org.apache.spark.SparkContext
+import org.apache.phoenix.spark._
+
+val sc = new SparkContext("local", "phoenix-test")
+val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+
+sc
+  .parallelize(dataSet)
+  .saveToPhoenix(
+    "OUTPUT_TEST_TABLE",
+    Seq("ID","COL1","COL2"),
+    zkUrl = Some("phoenix-server:2181")
+  )
+```
+
+## Notes
+
+The functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
+optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
+as well as an optional `zkUrl` parameter for the Phoenix connection URL.
+
+If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set
+in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified.
+
+## Limitations
+
+- No pushdown predicate support from Spark SQL (yet)
+- No support for aggregate or distinct functions (http://phoenix.apache.org/phoenix_mr.html)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
new file mode 100644
index 0000000..7529e08
--- /dev/null
+++ b/phoenix-spark/pom.xml
@@ -0,0 +1,523 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.phoenix</groupId>
+    <artifactId>phoenix</artifactId>
+    <version>4.4.0-HBase-0.98-SNAPSHOT</version>
+  </parent>
+  <artifactId>phoenix-spark</artifactId>
+  <name>Phoenix - Spark</name>
+
+  <properties>
+    <spark.version>1.3.0</spark.version>
+    <scala.version>2.10.4</scala.version>
+    <scala.binary.version>2.10</scala.binary.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.phoenix</groupId>
+      <artifactId>phoenix-core</artifactId>
+    </dependency>
+
+    <!-- Force import of Spark's servlet API for unit tests -->
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>javax.servlet-api</artifactId>
+      <version>3.0.1</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <version>2.2.2</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.scalamock</groupId>
+      <artifactId>scalamock-scalatest-support_${scala.binary.version}</artifactId>
+      <version>3.1.4</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <version>1.1.1.6</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop-two.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>jsp-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop-two.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>jsp-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop-two.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>jsp-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop-two.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>jsp-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>thrift</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-api-2.1</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api-2.5</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop-compat</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>thrift</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-api-2.1</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api-2.5</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop2-compat</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+      <exclusions>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.thrift</groupId>
+          <artifactId>thrift</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-api-2.1</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>servlet-api-2.5</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.jboss.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-it</artifactId>
+      <version>${hbase.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+        <version>3.2.0</version>
+        <configuration>
+          <charset>${project.build.sourceEncoding}</charset>
+          <jvmArgs>
+            <jvmArg>-Xmx1024m</jvmArg>
+          </jvmArgs>
+          <scalaVersion>${scala.version}</scalaVersion>
+        </configuration>
+        <executions>
+          <execution>
+            <id>scala-compile-first</id>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>add-source</goal>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>scala-test-compile</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>WDF TestSuite.txt</filereports>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <phase>test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <parallel>true</parallel>
+              <tagsToExclude>Integration-Test</tagsToExclude>
+            </configuration>
+          </execution>
+          <execution>
+            <id>integration-test</id>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <parallel>false</parallel>
+              <tagsToInclude>Integration-Test</tagsToInclude>
+              <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
new file mode 100644
index 0000000..b27f9f9
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -0,0 +1,166 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.io.NullWritable
+import org.apache.phoenix.mapreduce.PhoenixInputFormat
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
+import org.apache.phoenix.schema.types._
+import org.apache.phoenix.util.ColumnInfo
+import org.apache.spark._
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.{Row, DataFrame, SQLContext}
+import org.apache.spark.sql.types._
+import scala.collection.JavaConverters._
+
+class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
+                 predicate: Option[String] = None, zkUrl: Option[String] = None,
+                 @transient conf: Configuration)
+  extends RDD[PhoenixRecordWritable](sc, Nil) with Logging {
+
+  @transient lazy val phoenixConf = {
+    getPhoenixConfiguration
+  }
+
+  val phoenixRDD = sc.newAPIHadoopRDD(phoenixConf,
+    classOf[PhoenixInputFormat[PhoenixRecordWritable]],
+    classOf[NullWritable],
+    classOf[PhoenixRecordWritable])
+
+  override protected def getPartitions: Array[Partition] = {
+    phoenixRDD.partitions
+  }
+
+  @DeveloperApi
+  override def compute(split: Partition, context: TaskContext) = {
+    phoenixRDD.compute(split, context).map(r => r._2)
+  }
+
+  def printPhoenixConfig(conf: Configuration): Unit = {
+    for (mapEntry <- conf.iterator().asScala) {
+      val k = mapEntry.getKey
+      val v = mapEntry.getValue
+
+      if (k.startsWith("phoenix")) {
+        println(s"$k = $v")
+      }
+    }
+  }
+
+  def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
+    val query = "SELECT %s FROM \"%s\"".format(
+      columns.map(f => "\"" + f + "\"").mkString(", "),
+      table
+    )
+
+    query + (predicate match {
+      case Some(p: String) => " WHERE " + p
+      case _ => ""
+    })
+  }
+
+  def getPhoenixConfiguration: Configuration = {
+
+    // This is just simply not serializable, so don't try, but clone it because
+    // PhoenixConfigurationUtil mutates it.
+    val config = new Configuration(conf)
+
+    PhoenixConfigurationUtil.setInputQuery(config, buildSql(table, columns, predicate))
+    PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(","))
+    PhoenixConfigurationUtil.setInputTableName(config, "\"" + table + "\"")
+    PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable])
+
+    // Override the Zookeeper URL if present. Throw exception if no address given.
+    zkUrl match {
+      case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url )
+      case _ => {
+        if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) {
+          throw new UnsupportedOperationException(
+            s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
+          )
+        }
+      }
+    }
+
+    config
+  }
+
+  // Convert our PhoenixRDD to a DataFrame
+  def toDataFrame(sqlContext: SQLContext): DataFrame = {
+    val columnList = PhoenixConfigurationUtil
+      .getSelectColumnMetadataList(new Configuration(phoenixConf))
+      .asScala
+
+    val columnNames: Seq[String] = columnList.map(ci => {
+      ci.getDisplayName
+    })
+
+    // Lookup the Spark catalyst types from the Phoenix schema
+    val structFields = phoenixSchemaToCatalystSchema(columnList).toArray
+
+    // Create the data frame from the converted Spark schema
+    sqlContext.createDataFrame(map(pr => {
+      val values = pr.resultMap
+      val row = new GenericMutableRow(values.size)
+
+      columnNames.zipWithIndex.foreach {
+        case (columnName, i) => {
+          row.update(i, values(columnName))
+        }
+      }
+
+      row.asInstanceOf[Row]
+    }), new StructType(structFields))
+  }
+
+  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = {
+    columnList.map(ci => {
+      val structType = phoenixTypeToCatalystType(ci.getPDataType)
+      StructField(ci.getDisplayName, structType)
+    })
+  }
+
+  // Lookup table for Phoenix types to Spark catalyst types
+  def phoenixTypeToCatalystType(phoenixType: PDataType[_]): DataType = phoenixType match {
+    case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
+    case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
+    case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
+    case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
+    case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
+    case t if t.isInstanceOf[PDecimal] => DecimalType(None)
+    case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
+    case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
+    case t if t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate] => TimestampType
+    case t if t.isInstanceOf[PBoolean] => BooleanType
+    case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
+    case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
+    case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
+    case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
+    case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
+    case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
+    case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
+    case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
+    case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
+    case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
+    case t if t.isInstanceOf[PDecimalArray] => ArrayType(DecimalType(None), containsNull = true)
+    case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
+    case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
+    case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
new file mode 100644
index 0000000..48a70ec
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -0,0 +1,89 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.{PreparedStatement, ResultSet}
+import org.apache.hadoop.mapreduce.lib.db.DBWritable
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder
+import org.apache.phoenix.schema.types.{PDate, PhoenixArray}
+import org.joda.time.DateTime
+import scala.collection.{immutable, mutable}
+import scala.collection.JavaConversions._
+
+class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable {
+  val upsertValues = mutable.ArrayBuffer[Any]()
+  val resultMap = mutable.Map[String, AnyRef]()
+
+  def result : immutable.Map[String, AnyRef] = {
+    resultMap.toMap
+  }
+
+  override def write(statement: PreparedStatement): Unit = {
+    // Decode the ColumnInfo list
+    val columns = ColumnInfoToStringEncoderDecoder.decode(encodedColumns).toList
+
+    // Make sure we at least line up in size
+    if(upsertValues.length != columns.length) {
+      throw new UnsupportedOperationException(
+        s"Upsert values ($upsertValues) do not match the specified columns ($columns)"
+      )
+    }
+
+    // Correlate each value (v) to a column type (c) and an index (i)
+    upsertValues.zip(columns).zipWithIndex.foreach {
+      case ((v, c), i) => {
+        if (v != null) {
+          // Both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date
+          val (finalObj, finalType) = v match {
+            case dt: DateTime => (new java.sql.Date(dt.getMillis), PDate.INSTANCE.getSqlType)
+            case d: java.util.Date => (new java.sql.Date(d.getTime), PDate.INSTANCE.getSqlType)
+            case _ => (v, c.getSqlType)
+          }
+          statement.setObject(i + 1, finalObj, finalType)
+        } else {
+          statement.setNull(i + 1, c.getSqlType)
+        }
+      }
+    }
+  }
+
+  override def readFields(resultSet: ResultSet): Unit = {
+    val metadata = resultSet.getMetaData
+    for(i <- 1 to metadata.getColumnCount) {
+
+      // Return the contents of a PhoenixArray, if necessary
+      val value = resultSet.getObject(i) match {
+        case x: PhoenixArray => x.getArray
+        case y => y
+      }
+
+      // Put a (ColumnLabel -> value) entry in the result map
+      resultMap(metadata.getColumnLabel(i)) = value
+    }
+  }
+
+  def add(value: Any): Unit = {
+    upsertValues.append(value)
+  }
+
+  // Empty constructor for MapReduce
+  def this() = {
+    this("")
+  }
+
+  // Encoded columns are a Phoenix-serialized representation of the column meta data
+  def setEncodedColumns(encodedColumns: String) {
+    this.encodedColumns = encodedColumns
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
new file mode 100644
index 0000000..2926569
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -0,0 +1,68 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HConstants
+import org.apache.hadoop.io.NullWritable
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat
+import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Serializable {
+
+  def saveToPhoenix(tableName: String, cols: Seq[String],
+                    conf: Configuration = new Configuration, zkUrl: Option[String] = None)
+                    : Unit = {
+
+    // Setup Phoenix output configuration, make a local copy
+    val config = new Configuration(conf)
+    PhoenixConfigurationUtil.setOutputTableName(config, tableName)
+    PhoenixConfigurationUtil.setUpsertColumnNames(config, cols.mkString(","))
+
+    // Override the Zookeeper URL if present. Throw exception if no address given.
+    zkUrl match {
+      case Some(url) => config.set(HConstants.ZOOKEEPER_QUORUM, url )
+      case _ => {
+        if(config.get(HConstants.ZOOKEEPER_QUORUM) == null) {
+          throw new UnsupportedOperationException(
+            s"One of zkUrl or '${HConstants.ZOOKEEPER_QUORUM}' config property must be provided"
+          )
+        }
+      }
+    }
+
+    // Encode the column info to a serializable type
+    val encodedColumns = ColumnInfoToStringEncoderDecoder.encode(
+      PhoenixConfigurationUtil.getUpsertColumnMetadataList(config)
+    )
+
+    // Map each element of the product to a new (NullWritable, PhoenixRecordWritable)
+    val phxRDD: RDD[(NullWritable, PhoenixRecordWritable)] = data.map { e =>
+      val rec = new PhoenixRecordWritable(encodedColumns)
+      e.productIterator.foreach { rec.add(_) }
+      (null, rec)
+    }
+
+    // Save it
+    phxRDD.saveAsNewAPIHadoopFile(
+      "",
+      classOf[NullWritable],
+      classOf[PhoenixRecordWritable],
+      classOf[PhoenixOutputFormat[PhoenixRecordWritable]],
+      config
+    )
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
new file mode 100644
index 0000000..a3cd8f0
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
@@ -0,0 +1,41 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+class SparkContextFunctions(@transient val sc: SparkContext) extends Serializable {
+
+  /*
+    This will return an RDD of Map[String, AnyRef], where the String key corresponds to the column
+    name and the AnyRef value will be a java.sql type as returned by Phoenix
+
+    'table' is the corresponding Phoenix table
+    'columns' is a sequence of of columns to query
+    'predicate' is a set of statements to go after a WHERE clause, e.g. "TID = 123"
+    'zkUrl' is an optional Zookeeper URL to use to connect to Phoenix
+    'conf' is a Hadoop Configuration object. If zkUrl is not set, the "hbase.zookeeper.quorum"
+      property will be used
+   */
+
+  def phoenixTableAsRDD(table: String, columns: Seq[String], predicate: Option[String] = None,
+                        zkUrl: Option[String] = None, conf: Configuration = new Configuration())
+                        : RDD[Map[String, AnyRef]] = {
+
+    // Create a PhoenixRDD, but only return the serializable 'result' map
+    new PhoenixRDD(sc, table, columns, predicate, zkUrl, conf).map(_.result)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
new file mode 100644
index 0000000..cc3f378
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
@@ -0,0 +1,39 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+class SparkSqlContextFunctions(@transient val sqlContext: SQLContext) extends Serializable {
+
+  /*
+  This will return a Spark DataFrame, with Phoenix types converted Spark SQL catalyst types
+
+  'table' is the corresponding Phoenix table
+  'columns' is a sequence of of columns to query
+  'predicate' is a set of statements to go after a WHERE clause, e.g. "TID = 123"
+  'zkUrl' is an optional Zookeeper URL to use to connect to Phoenix
+  'conf' is a Hadoop Configuration object. If zkUrl is not set, the "hbase.zookeeper.quorum"
+    property will be used
+ */
+  def phoenixTableAsDataFrame(table: String, columns: Seq[String],
+                               predicate: Option[String] = None, zkUrl: Option[String] = None,
+                               conf: Configuration = new Configuration): DataFrame = {
+
+    // Create the PhoenixRDD and convert it to a DataFrame
+    new PhoenixRDD(sqlContext.sparkContext, table, columns, predicate, zkUrl, conf)
+      .toDataFrame(sqlContext)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
new file mode 100644
index 0000000..c19ec16
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/package.scala
@@ -0,0 +1,32 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+package object spark {
+  implicit def toProductRDDFunctions[A <: Product](rdd: RDD[A]): ProductRDDFunctions[A] = {
+    new ProductRDDFunctions[A](rdd)
+  }
+
+  implicit def toSparkContextFunctions(sc: SparkContext): SparkContextFunctions = {
+    new SparkContextFunctions(sc)
+  }
+
+  implicit def toSparkSqlContextFunctions(sqlContext: SQLContext): SparkSqlContextFunctions = {
+    new SparkSqlContextFunctions(sqlContext)
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/test/resources/log4j.xml b/phoenix-spark/src/test/resources/log4j.xml
new file mode 100644
index 0000000..d4799da
--- /dev/null
+++ b/phoenix-spark/src/test/resources/log4j.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out"/>
+
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%-4r [%t] %-5p %c %x - %m%n"/>
+    </layout>
+  </appender>
+
+  <logger name="org.eclipse">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="org.apache">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name = "org.apache.phoenix.mapreduce">
+    <level value="FATAL"/>
+  </logger>
+
+  <logger name="org.mortbay">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="BlockStateChange">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="io.netty">
+    <level value="ERROR"/>
+  </logger>
+
+  <root>
+    <priority value="INFO"/>
+    <appender-ref ref="console"/>
+  </root>
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/test/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/test/resources/setup.sql b/phoenix-spark/src/test/resources/setup.sql
new file mode 100644
index 0000000..14a7e7e
--- /dev/null
+++ b/phoenix-spark/src/test/resources/setup.sql
@@ -0,0 +1,18 @@
+CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
+UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
+UPSERT INTO table1 (id, col1) VALUES (2, 'test_row_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
+CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
+UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
+UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
+CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
+UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
+CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
+UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala b/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
new file mode 100644
index 0000000..876f8a4
--- /dev/null
+++ b/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
@@ -0,0 +1,333 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.{Connection, DriverManager}
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility}
+import org.apache.phoenix.schema.ColumnNotFoundException
+import org.apache.phoenix.schema.types.PVarchar
+import org.apache.phoenix.util.ColumnInfo
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.joda.time.DateTime
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import org.apache.phoenix.spark._
+
+import scala.collection.mutable.ListBuffer
+
+class PhoenixRDDTest extends FunSuite with Matchers with BeforeAndAfterAll {
+  lazy val hbaseTestingUtility = {
+    new HBaseTestingUtility()
+  }
+
+  lazy val hbaseConfiguration = {
+    val conf = hbaseTestingUtility.getConfiguration
+
+    val quorum = conf.get("hbase.zookeeper.quorum")
+    val clientPort = conf.get("hbase.zookeeper.property.clientPort")
+    val znodeParent = conf.get("zookeeper.znode.parent")
+
+    // This is an odd one - the Zookeeper Quorum entry in the config is totally wrong. It's
+    // just reporting localhost.
+    conf.set(org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM, s"$quorum:$clientPort:$znodeParent")
+
+    conf
+  }
+
+  lazy val quorumAddress = {
+    hbaseConfiguration.get("hbase.zookeeper.quorum")
+  }
+
+  lazy val zookeeperClientPort = {
+    hbaseConfiguration.get("hbase.zookeeper.property.clientPort")
+  }
+
+  lazy val zookeeperZnodeParent = {
+    hbaseConfiguration.get("zookeeper.znode.parent")
+  }
+
+  lazy val hbaseConnectionString = {
+    s"$quorumAddress:$zookeeperClientPort:$zookeeperZnodeParent"
+  }
+
+  var conn: Connection = _
+
+  override def beforeAll() {
+    hbaseTestingUtility.startMiniCluster()
+
+    conn = DriverManager.getConnection(s"jdbc:phoenix:$hbaseConnectionString")
+
+    conn.setAutoCommit(true)
+
+    // each SQL statement used to set up Phoenix must be on a single line. Yes, that
+    // can potentially make large lines.
+    val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
+
+    val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
+
+    for (sql <- setupSql) {
+      val stmt = conn.createStatement()
+
+      stmt.execute(sql)
+
+      stmt.close()
+    }
+
+    conn.commit()
+  }
+
+  override def afterAll() {
+    conn.close()
+    hbaseTestingUtility.shutdownMiniCluster()
+  }
+
+  val conf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+
+  val sc = new SparkContext("local[1]", "PhoenixSparkTest", conf)
+
+  def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
+    val query = "SELECT %s FROM \"%s\"" format(columns.map(f => "\"" + f + "\"").mkString(", "), table)
+
+    query + (predicate match {
+      case Some(p: String) => " WHERE " + p
+      case _ => ""
+    })
+  }
+
+  test("Can create valid SQL") {
+    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
+      conf = hbaseConfiguration)
+
+    rdd.buildSql("MyTable", Array("Foo", "Bar"), None) should
+      equal("SELECT \"Foo\", \"Bar\" FROM \"MyTable\"")
+  }
+
+  test("Can convert Phoenix schema") {
+    val phoenixSchema = List(
+      new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
+    )
+
+    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
+      conf = hbaseConfiguration)
+
+    val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+    val expected = List(StructField("varcharColumn", StringType, nullable = true))
+
+    catalystSchema shouldEqual expected
+  }
+
+  test("Can create schema RDD and execute query") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+
+    df1.registerTempTable("sql_table_1")
+
+    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
+      conf = hbaseConfiguration)
+
+    df2.registerTempTable("sql_table_2")
+
+    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 6L
+  }
+
+  test("Can create schema RDD and execute query on case sensitive table (no config)") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(hbaseConnectionString))
+
+    df1.registerTempTable("table3")
+
+    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 2L
+  }
+
+  test("Can create schema RDD and execute constrained query") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+
+    df1.registerTempTable("sql_table_1")
+
+    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
+      predicate = Some("\"ID\" = 1"),
+      conf = hbaseConfiguration)
+
+    df2.registerTempTable("sql_table_2")
+
+    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 1L
+  }
+
+  test("Using a predicate referring to a non-existent column should fail") {
+    intercept[RuntimeException] {
+      val sqlContext = new SQLContext(sc)
+
+      val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+        predicate = Some("foo = bar"),
+        conf = hbaseConfiguration)
+
+      df1.registerTempTable("table3")
+
+      val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+      // we have to execute an action before the predicate failure can occur
+      val count = sqlRdd.count()
+    }.getCause shouldBe a [ColumnNotFoundException]
+  }
+
+  test("Can create schema RDD with predicate that will never match") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+      predicate = Some("\"id\" = -1"),
+      conf = hbaseConfiguration)
+
+    df1.registerTempTable("table3")
+
+    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 0L
+  }
+
+  test("Can create schema RDD with complex predicate") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"),
+      predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"),
+      conf = hbaseConfiguration)
+
+    df1.registerTempTable("date_predicate_test_table")
+
+    val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 0L
+  }
+
+  test("Can query an array table") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
+      conf = hbaseConfiguration)
+
+    df1.registerTempTable("ARRAY_TEST_TABLE")
+
+    val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
+
+    val count = sqlRdd.count()
+
+    // get row 0, column 1, which should be "VCARRAY"
+    val arrayValues = sqlRdd.collect().apply(0).apply(1)
+
+    arrayValues should equal(Array("String1", "String2", "String3"))
+
+    count shouldEqual 1L
+  }
+
+  test("Can read a table as an RDD") {
+    val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
+      conf = hbaseConfiguration)
+
+    val count = rdd1.count()
+
+    val arrayValues = rdd1.take(1)(0)("VCARRAY")
+
+    arrayValues should equal(Array("String1", "String2", "String3"))
+
+    count shouldEqual 1L
+  }
+
+  test("Can save to phoenix table") {
+    val sqlContext = new SQLContext(sc)
+
+    val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        "OUTPUT_TEST_TABLE",
+        Seq("ID","COL1","COL2"),
+        hbaseConfiguration
+      )
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
+    val results = ListBuffer[(Long, String, Int)]()
+    while(rs.next()) {
+      results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
+    }
+    stmt.close()
+
+    // Verify they match
+    (0 to results.size - 1).foreach { i =>
+      dataSet(i) shouldEqual results(i)
+    }
+  }
+
+  test("Can save Java and Joda dates to Phoenix (no config)") {
+    val dt = new DateTime()
+    val date = new Date()
+
+    val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        "OUTPUT_TEST_TABLE",
+        Seq("ID","COL1","COL2","COL3"),
+        zkUrl = Some(hbaseConnectionString)
+      )
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
+    val results = ListBuffer[java.sql.Date]()
+    while(rs.next()) {
+      results.append(rs.getDate(1))
+    }
+    stmt.close()
+
+    // Verify the epochs are equal
+    results(0).getTime shouldEqual dt.getMillis
+    results(1).getTime shouldEqual date.getTime
+  }
+
+  test("Not specifying a zkUrl or a config quorum URL should fail") {
+    intercept[UnsupportedOperationException] {
+      val sqlContext = new SQLContext(sc)
+      val badConf = new Configuration(hbaseConfiguration)
+      badConf.unset(HConstants.ZOOKEEPER_QUORUM)
+      sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/976e37c0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0675d81..9edf084 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,6 +28,7 @@
     <module>phoenix-pig</module>
     <module>phoenix-assembly</module>
     <module>phoenix-pherf</module>
+    <module>phoenix-spark</module>
   </modules>
 
   <repositories>