You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2019/01/08 06:25:11 UTC

[GitHub] twdsilva closed pull request #423: PHOENIX-5059 Use the Datasource v2 api in the spark connector

twdsilva closed pull request #423: PHOENIX-5059 Use the Datasource v2 api in the spark connector
URL: https://github.com/apache/phoenix/pull/423
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
index 3051cd66de..ef127ac737 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/BaseSaltedTableIT.java
@@ -194,7 +194,7 @@ public void testSelectValueWithFullyQualifiedWhereClause() throws Exception {
                 .setSelectColumns(
                         Lists.newArrayList("A_INTEGER", "A_STRING", "A_ID", "B_STRING", "B_INTEGER"))
                 .setFullTableName(tableName)
-                .setWhereClause("a_integer = 1 AND a_string >= 'ab' AND a_string < 'de' AND a_id = '123'");
+                .setWhereClause("A_INTEGER = 1 AND A_STRING >= 'ab' AND A_STRING < 'de' AND A_ID = '123'");
             rs = executeQuery(conn, queryBuilder);
             assertTrue(rs.next());
             assertEquals(1, rs.getInt(1));
@@ -205,7 +205,7 @@ public void testSelectValueWithFullyQualifiedWhereClause() throws Exception {
             assertFalse(rs.next());
 
             // all single slots with one value.
-            queryBuilder.setWhereClause("a_integer = 1 AND a_string = 'ab' AND a_id = '123'");
+            queryBuilder.setWhereClause("A_INTEGER = 1 AND A_STRING = 'ab' AND A_ID = '123'");
             rs = executeQuery(conn, queryBuilder);
             assertTrue(rs.next());
             assertEquals(1, rs.getInt(1));
@@ -216,7 +216,7 @@ public void testSelectValueWithFullyQualifiedWhereClause() throws Exception {
             assertFalse(rs.next());
 
             // all single slots with multiple values.
-            queryBuilder.setWhereClause("a_integer in (2, 4) AND a_string = 'abc' AND a_id = '123'");
+            queryBuilder.setWhereClause("A_INTEGER in (2, 4) AND A_STRING = 'abc' AND A_ID = '123'");
             rs = executeQuery(conn, queryBuilder);
 
             assertTrue(rs.next());
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 038e314ccb..f426c83800 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -487,6 +487,14 @@
     <testSourceDirectory>src/it/scala</testSourceDirectory>
     <testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
     <plugins>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+                <source>1.8</source>
+                <target>1.8</target>
+            </configuration>
+        </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
index bdffaf554d..4c60bc81b3 100644
--- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java
@@ -16,12 +16,13 @@
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseOrderByIT;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryBuilder;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
-import org.junit.Ignore;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -106,22 +107,15 @@ public void testOrderByWithJoin() throws Exception {
 
             // create two PhoenixRDDs  using the table names and columns that are required for the JOIN query
             List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            Dataset phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
-                            JavaConverters.collectionAsScalaIterableConverter(table1Columns)
-                                    .asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName1);
-            List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
-            phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
-                            JavaConverters.collectionAsScalaIterableConverter(table2Columns)
-                                    .asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName2);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName1)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName1);
+            phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName2)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName2);
 
             String query =
                     "SELECT T1.* FROM " + tableName1 + " T1 JOIN " + tableName2
@@ -155,8 +149,8 @@ public void testOrderByWithJoin() throws Exception {
             assertFalse(rs.next());
 
             query =
-                    "select t1.a_string, t2.col1 from " + tableName1 + " t1 join " + tableName2
-                            + " t2 on t1.a_string = t2.a_string order by t2.col1";
+                    "SELECT T1.A_STRING, T2.COL1 FROM " + tableName1 + " T1 JOIN " + tableName2
+                            + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T2.COL1";
             dataset =  sqlContext.sql(query);
             rows = dataset.collectAsList();
             rs = new SparkResultSet(rows, dataset.columns());
@@ -228,23 +222,15 @@ public void testOrderByWithUnionAll() throws Exception {
             conn.commit();
 
 
-            List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D");
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            Dataset phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName1,
-                            JavaConverters.collectionAsScalaIterableConverter(table1Columns)
-                                    .asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName1);
-            List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1");
-            phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName2,
-                            JavaConverters.collectionAsScalaIterableConverter(table2Columns)
-                                    .asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-            phoenixDataSet.registerTempTable(tableName2);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                            .option(DataSourceOptions.TABLE_KEY, tableName1)
+                            .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName1);
+            phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName2)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName2);
 
             String query =
                     "select a_string, `cf2.d` from " + tableName1 + " union all select * from "
@@ -311,17 +297,11 @@ public void testOrderByWithExpression() throws Exception {
             stmt.execute();
             conn.commit();
 
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            Dataset phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
-                            JavaConverters
-                                    .collectionAsScalaIterableConverter(
-                                        Lists.newArrayList("col1", "col2", "col4"))
-                                    .asScala().toSeq(),
-                            Option.apply((String) null), Option.apply(getUrl()), config, false,
-                            null).toDataFrame(sqlContext);
-
-            phoenixDataSet.registerTempTable(tableName);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName);
             Dataset<Row> dataset =
                     sqlContext.sql("SELECT col1+col2, col4, a_string FROM " + tableName
                             + " ORDER BY col1+col2, col4");
@@ -379,19 +359,11 @@ public void testColumnFamily() throws Exception {
             conn.commit();
 
 
-            List<String> columns =
-                    Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D",
-                        "COL2");
-
-            SQLContext sqlContext = SparkUtil.getSqlContext();
-            Dataset phoenixDataSet =
-                    new PhoenixRDD(SparkUtil.getSparkContext(), tableName,
-                            JavaConverters.collectionAsScalaIterableConverter(columns).asScala()
-                                    .toSeq(),
-                            Option.apply((String) null), Option.apply(url), config, false, null)
-                                    .toDataFrame(sqlContext);
-
-            phoenixDataSet.registerTempTable(tableName);
+            SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
+            Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
+                    .option(DataSourceOptions.TABLE_KEY, tableName)
+                    .option(PhoenixDataSource.ZOOKEEPER_URL, getUrl()).load();
+            phoenixDataSet.createOrReplaceTempView(tableName);
             Dataset<Row> dataset =
                     sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
                             + tableName + " ORDER BY `CF1.A`,`CF2.C`");
diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
index 628520921e..668c3c8890 100644
--- a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
+++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java
@@ -22,13 +22,14 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.QueryBuilder;
-import org.apache.spark.SparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import scala.Option;
 import scala.collection.JavaConverters;
 
@@ -43,19 +44,14 @@
     public static final String NUM_EXECUTORS = "local[2]";
     public static final String UI_SHOW_CONSOLE_PROGRESS = "spark.ui.showConsoleProgress";
 
-    public static SparkContext getSparkContext() {
+    public static SparkSession getSparkSession() {
         return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
-                .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sparkContext();
-    }
-
-    public static SQLContext getSqlContext() {
-        return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS)
-                .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sqlContext();
+                .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate();
     }
 
     public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config)
             throws SQLException {
-        SQLContext sqlContext = SparkUtil.getSqlContext();
+        SQLContext sqlContext = getSparkSession().sqlContext();
 
         boolean forceRowKeyOrder =
                 conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()
@@ -69,14 +65,11 @@ public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder,
 
         // create PhoenixRDD using the table name and columns that are required by the query
         // since we don't set the predicate filtering is done after rows are returned from spark
-        Dataset phoenixDataSet =
-                new PhoenixRDD(SparkUtil.getSparkContext(), queryBuilder.getFullTableName(),
-                        JavaConverters.collectionAsScalaIterableConverter(queryBuilder.getRequiredColumns()).asScala()
-                                .toSeq(),
-                        Option.apply((String) null), Option.apply(url), config, false,
-                        null).toDataFrame(sqlContext);
+        Dataset phoenixDataSet = getSparkSession().read().format("phoenix")
+                .option(DataSourceOptions.TABLE_KEY, queryBuilder.getFullTableName())
+                .option(PhoenixDataSource.ZOOKEEPER_URL, url).load();
 
-        phoenixDataSet.registerTempTable(queryBuilder.getFullTableName());
+        phoenixDataSet.createOrReplaceTempView(queryBuilder.getFullTableName());
         Dataset<Row> dataset = sqlContext.sql(queryBuilder.build());
         SparkPlan plan = dataset.queryExecution().executedPlan();
         List<Row> rows = dataset.collectAsList();
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index 7ac003932b..efdb8cb47a 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -26,9 +26,9 @@ UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
 UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
-CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
-UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
-UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
+CREATE TABLE "table4" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
+UPSERT INTO "table4" ("id", "col1") VALUES (1, 'foo')
+UPSERT INTO "table4" ("id", "col1") VALUES (2, 'bar')
 CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
 UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
 CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index ca3470f492..a9c2070ecb 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -19,6 +19,7 @@ import java.util.Properties
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
 import org.apache.phoenix.query.BaseTest
 import org.apache.phoenix.util.PhoenixRuntime
+import org.apache.spark.sql.{SQLContext, SparkSession}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite, Matchers}
 
@@ -50,7 +51,7 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
   final val TenantId = "theTenant"
 
   var conn: Connection = _
-  var sc: SparkContext = _
+  var spark: SparkSession = _
 
   lazy val hbaseConfiguration = {
     val conf = PhoenixSparkITHelper.getTestClusterConfig
@@ -99,12 +100,17 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
       .setMaster("local[2]") // 2 threads, some parallelism
       .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
 
-    sc = new SparkContext(conf)
+    spark = SparkSession
+      .builder()
+      .appName("PhoenixSparkIT")
+      .master("local[2]") // 2 threads, some parallelism
+      .config("spark.ui.showConsoleProgress", "false")
+      .getOrCreate()
   }
 
   override def afterAll() {
     conn.close()
-    sc.stop()
+    spark.stop()
     PhoenixSparkITHelper.cleanUpAfterTest()
     PhoenixSparkITHelper.doTeardown
   }
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index d1e38faf67..d6d0f921ab 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,16 +13,16 @@
  */
 package org.apache.phoenix.spark
 
+import java.sql.DriverManager
 import java.util.Date
 
 import org.apache.phoenix.schema.types.PVarchar
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Row, SQLContext, SaveMode}
-import org.joda.time.DateTime
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{Row, SaveMode}
+
 import scala.collection.mutable.ListBuffer
-import org.apache.hadoop.conf.Configuration
 
 /**
   * Note: If running directly from an IDE, these are the recommended VM parameters:
@@ -30,13 +30,20 @@ import org.apache.hadoop.conf.Configuration
   */
 class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
-  test("Can persist data with case senstive columns (like in avro schema) using 'DataFrame.saveToPhoenix'") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.createDataFrame(
+  test("Can persist data with case sensitive columns (like in avro schema)") {
+    val df = spark.createDataFrame(
       Seq(
         (1, 1, "test_child_1"),
-        (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1")
-    df.saveToPhoenix("TABLE3", zkUrl = Some(quorumAddress),skipNormalizingIdentifier=true)
+        (2, 1, "test_child_2"))).
+    // column names are case sensitive
+      toDF("ID", "TABLE3_ID", "t2col1")
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "TABLE3",
+        PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true"))
+      .mode(SaveMode.Overwrite)
+      .save()
+
 
     // Verify results
     val stmt = conn.createStatement()
@@ -50,7 +57,29 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     stmt.close()
 
     results.toList shouldEqual checkResults
+  }
+
+  // INSERT is not support using DataSource v2 api yet
+  ignore("Can use write data using spark SQL INSERT") {
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE3", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+    df1.createOrReplaceTempView("TABLE3")
+
+    // Insert data
+    spark.sql("INSERT INTO TABLE3 VALUES(10, 10, 10)")
+    spark.sql("INSERT INTO TABLE3 VALUES(20, 20, 20)")
+
+    // Verify results
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE3 WHERE ID>=10")
+    val expectedResults = List((10, 10, "10"), (20, 20, "20"))
+    val results = ListBuffer[(Long, Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+    }
+    stmt.close()
 
+    results.toList shouldEqual expectedResults
   }
   
   test("Can convert Phoenix schema") {
@@ -58,29 +87,25 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
       new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
     )
 
-    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
-      conf = hbaseConfiguration)
-
-    val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
+    val catalystSchema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(phoenixSchema)
 
-    val expected = List(StructField("varcharColumn", StringType, nullable = true))
+    val expected = new StructType(List(StructField("varcharColumn", StringType, nullable = true)).toArray)
 
     catalystSchema shouldEqual expected
   }
 
   test("Can create schema RDD and execute query") {
-    val sqlContext = new SQLContext(sc)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+    df1.createOrReplaceTempView("sql_table_1")
 
-    df1.registerTempTable("sql_table_1")
+    val df2 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
-      conf = hbaseConfiguration)
+    df2.createOrReplaceTempView("sql_table_2")
 
-    df2.registerTempTable("sql_table_2")
-
-    val sqlRdd = sqlContext.sql(
+    val sqlRdd = spark.sql(
       """
         |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
         |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
@@ -91,18 +116,49 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     count shouldEqual 6L
   }
 
-  test("Can create schema RDD and execute query on case sensitive table (no config)") {
-    val sqlContext = new SQLContext(sc)
+  ignore("Ordering by pk columns should not require sorting") {
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+    df1.createOrReplaceTempView("TABLE1")
 
+    val sqlRdd = spark.sql("SELECT * FROM TABLE1 ORDER BY ID, COL1")
+    val plan = sqlRdd.queryExecution.sparkPlan
+    // verify the spark plan doesn't have a sort
+    assert(!plan.toString.contains("Sort"))
 
-    val df1 = sqlContext.phoenixTableAsDataFrame(
-      SchemaUtil.getEscapedArgument("table3"),
-      Array("id", "col1"),
-      zkUrl = Some(quorumAddress))
+    val expectedResults = Array(Row.fromSeq(Seq(1, "test_row_1")), Row.fromSeq(Seq(2, "test_row_2")))
+    val actual = sqlRdd.collect()
 
-    df1.registerTempTable("table3")
+    actual shouldEqual expectedResults
+  }
+
+  test("Verify correct number of partitions are created") {
+    val conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
+    val ddl = "CREATE TABLE SPLIT_TABLE (id VARCHAR NOT NULL PRIMARY KEY, val VARCHAR) split on ('e','j','o')"
+    conn.createStatement.execute(ddl)
+    val keys = Array("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s",
+      "t", "u", "v", "w", "x", "y", "z")
+    for (key <- keys) {
+      conn.createStatement.execute("UPSERT INTO SPLIT_TABLE VALUES('" + key + "', '" + key + "')")
+    }
+    conn.commit()
 
-    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "SPLIT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+    df1.createOrReplaceTempView("SPLIT_TABLE")
+    val sqlRdd = spark.sql("SELECT * FROM SPLIT_TABLE")
+    val numPartitions = sqlRdd.rdd.partitions.size
+
+    numPartitions shouldEqual 4
+  }
+
+  test("Can create schema RDD and execute query on case sensitive table (no config)") {
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> SchemaUtil.getEscapedArgument("table4"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
+
+    df1.createOrReplaceTempView("table4")
+
+    val sqlRdd = spark.sql("SELECT id FROM table4")
 
     val count = sqlRdd.count()
 
@@ -110,20 +166,17 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can create schema RDD and execute constrained query") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"),
-      conf = hbaseConfiguration)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    df1.registerTempTable("sql_table_1")
+    df1.createOrReplaceTempView("sql_table_1")
 
-    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
-      predicate = Some("\"ID\" = 1"),
-      conf = hbaseConfiguration)
+    val df2 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = 1")
 
-    df2.registerTempTable("sql_table_2")
+    df2.createOrReplaceTempView("sql_table_2")
 
-    val sqlRdd = sqlContext.sql(
+    val sqlRdd = spark.sql(
       """
         |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
         |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
@@ -135,17 +188,12 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can create schema RDD with predicate that will never match") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame(
-      SchemaUtil.getEscapedArgument("table3"),
-      Array("id", "col1"),
-      predicate = Some("\"id\" = -1"),
-      conf = hbaseConfiguration)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load.filter("ID = -1")
 
-    df1.registerTempTable("table3")
+    df1.createOrReplaceTempView("table3")
 
-    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+    val sqlRdd = spark.sql("SELECT * FROM table3")
 
     val count = sqlRdd.count()
 
@@ -153,21 +201,17 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can create schema RDD with complex predicate") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame(
-      "DATE_PREDICATE_TEST_TABLE",
-      Array("ID", "TIMESERIES_KEY"),
-      predicate = Some(
-        """
-          |ID > 0 AND TIMESERIES_KEY BETWEEN
-          |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
-          |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
-      conf = hbaseConfiguration)
+    val predicate = "ID > 0 AND TIMESERIES_KEY BETWEEN " +
+      "CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND " +
+      "CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "DATE_PREDICATE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
+      .filter(predicate)
 
-    df1.registerTempTable("date_predicate_test_table")
+    df1.createOrReplaceTempView("date_predicate_test_table")
 
-    val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
+    val sqlRdd = spark.sqlContext.sql("SELECT * FROM date_predicate_test_table")
 
     val count = sqlRdd.count()
 
@@ -175,14 +219,12 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can query an array table") {
-    val sqlContext = new SQLContext(sc)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
-    val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
-      conf = hbaseConfiguration)
+    df1.createOrReplaceTempView("ARRAY_TEST_TABLE")
 
-    df1.registerTempTable("ARRAY_TEST_TABLE")
-
-    val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
+    val sqlRdd = spark.sql("SELECT * FROM ARRAY_TEST_TABLE")
 
     val count = sqlRdd.count()
 
@@ -195,12 +237,12 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can read a table as an RDD") {
-    val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
-      conf = hbaseConfiguration)
+    val rdd1 = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
     val count = rdd1.count()
 
-    val arrayValues = rdd1.take(1)(0)("VCARRAY")
+    val arrayValues = rdd1.take(1)(0)(1)
 
     arrayValues should equal(Array("String1", "String2", "String3"))
 
@@ -208,24 +250,30 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can save to phoenix table") {
-    val sqlContext = new SQLContext(sc)
+    val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("COL1", StringType),
+        StructField("COL2", IntegerType)))
 
-    val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "OUTPUT_TEST_TABLE",
-        Seq("ID", "COL1", "COL2"),
-        hbaseConfiguration
-      )
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
     val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
-    val results = ListBuffer[(Long, String, Int)]()
+    val results = ListBuffer[Row]()
     while (rs.next()) {
-      results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
+      results.append(Row(rs.getLong(1), rs.getString(2), rs.getInt(3)))
     }
 
     // Verify they match
@@ -234,18 +282,29 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     }
   }
 
-  test("Can save Java and Joda dates to Phoenix (no config)") {
-    val dt = new DateTime()
-    val date = new Date()
+  test("Can save dates to Phoenix using java.sql.Date") {
+    val date = java.sql.Date.valueOf("2016-09-30")
+
+    // Since we are creating a Row we have to use java.sql.date
+    // java.util.date or joda.DateTime is not supported
+    val dataSet = Seq(Row(1L, "1", 1, date), Row(2L, "2", 2, date))
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("COL1", StringType),
+        StructField("COL2", IntegerType),
+        StructField("COL3", DateType)))
 
-    val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "OUTPUT_TEST_TABLE",
-        Seq("ID", "COL1", "COL2", "COL3"),
-        zkUrl = Some(quorumAddress)
-      )
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -256,94 +315,56 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     }
 
     // Verify the epochs are equal
-    results(0).getTime shouldEqual dt.getMillis
+    results(0).getTime shouldEqual date.getTime
     results(1).getTime shouldEqual date.getTime
   }
 
   test("Can infer schema without defining columns") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.phoenixTableAsDataFrame("TABLE2", Seq(), conf = hbaseConfiguration)
+    val df = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     df.schema("ID").dataType shouldEqual LongType
     df.schema("TABLE1_ID").dataType shouldEqual LongType
     df.schema("t2col1").dataType shouldEqual StringType
   }
 
   test("Spark SQL can use Phoenix as a data source with no schema specified") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
     df.count() shouldEqual 2
     df.schema("ID").dataType shouldEqual LongType
     df.schema("COL1").dataType shouldEqual StringType
   }
 
-  test("Spark SQL can use Phoenix as a data source with PrunedFilteredScan") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
+  test("Datasource v2 pushes down filters") {
+    val df = spark.sqlContext.read.format("phoenix")
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
     val res = df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID"))
 
     // Make sure we got the right value back
     assert(res.first().getLong(0) == 1L)
 
     val plan = res.queryExecution.sparkPlan
-    // filters should be pushed into phoenix relation
-    assert(plan.toString.contains("PushedFilters: [*IsNotNull(COL1), *IsNotNull(ID), " +
-      "*EqualTo(COL1,test_row_1), *EqualTo(ID,1)]"))
-    // spark should run the filters on the rows returned by Phoenix
-    assert(!plan.toString.matches(".*Filter (((isnotnull(COL1.*) && isnotnull(ID.*)) "
-      + " && (COL1.* = test_row_1)) && (ID.* = 1)).*"))
+    // filters should be pushed into scan
+    assert(".*ScanV2 phoenix.*Filters.*ID.*COL1.*".r.findFirstIn(plan.toString).isDefined)
+    // spark should not do post scan filtering
+    assert(".*Filter .*ID.*COL1.*".r.findFirstIn(plan.toString).isEmpty)
   }
 
-  test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {
+  test("Can persist a dataframe") {
     // Load from TABLE1
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix").options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load
 
     // Save to TABLE1_COPY
-    df.saveToPhoenix("TABLE1_COPY", zkUrl = Some(quorumAddress))
-
-    // Verify results
-    val stmt = conn.createStatement()
-    val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
-
-    val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
-    val results = ListBuffer[(Long, String)]()
-    while (rs.next()) {
-      results.append((rs.getLong(1), rs.getString(2)))
-    }
-    stmt.close()
-
-    results.toList shouldEqual checkResults
-  }
-
-  test("Can persist a dataframe using 'DataFrame.save()") {
-    // Clear TABLE1_COPY
-    var stmt = conn.createStatement()
-    stmt.executeUpdate("DELETE FROM TABLE1_COPY")
-    stmt.close()
-
-    // Load TABLE1, save as TABLE1_COPY
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext
-      .read
-      .format("org.apache.phoenix.spark")
-      .option("table", "TABLE1")
-      .option("zkUrl", quorumAddress)
-      .load()
-
-    // Save to TABLE21_COPY
     df
       .write
-      .format("org.apache.phoenix.spark")
+      .format("phoenix")
       .mode(SaveMode.Overwrite)
       .option("table", "TABLE1_COPY")
-      .option("zkUrl", quorumAddress)
+      .option(PhoenixDataSource.ZOOKEEPER_URL, quorumAddress)
       .save()
 
     // Verify results
-    stmt = conn.createStatement()
+    val stmt = conn.createStatement()
     val rs = stmt.executeQuery("SELECT * FROM TABLE1_COPY")
 
     val checkResults = List((1L, "test_row_1"), (2, "test_row_2"))
@@ -357,15 +378,22 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can save arrays back to phoenix") {
-    val dataSet = List((2L, Array("String1", "String2", "String3")))
+    val dataSet = List(Row(2L, Array("String1", "String2", "String3")))
+    val schema = StructType(Seq(
+      StructField("ID", LongType, nullable = false),
+      StructField("VCARRAY", ArrayType(StringType, true))
+    ))
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "ARRAY_TEST_TABLE",
-        Seq("ID", "VCARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "ARRAY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -374,57 +402,54 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     val sqlArray = rs.getArray(1).getArray().asInstanceOf[Array[String]]
 
     // Verify the arrays are equal
-    sqlArray shouldEqual dataSet(0)._2
+    sqlArray shouldEqual dataSet(0).get(1)
   }
 
   test("Can read from table with schema and escaped table name") {
     // Manually escape
-    val rdd1 = sc.phoenixTableAsRDD(
-      "CUSTOM_ENTITY.\"z02\"",
-      Seq("ID"),
-      conf = hbaseConfiguration)
+    val df1 = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "CUSTOM_ENTITY.\"z02\"", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
 
-    var count = rdd1.count()
+    var count = df1.count()
 
     count shouldEqual 1L
 
     // Use SchemaUtil
-    val rdd2 = sc.phoenixTableAsRDD(
-      SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"),
-      Seq("ID"),
-      conf = hbaseConfiguration)
+    val df2 = spark.sqlContext.read.format("phoenix")
+      .options(
+        Map("table" -> SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load()
 
-    count = rdd2.count()
+    count = df2.count()
 
     count shouldEqual 1L
-
   }
 
   test("Ensure DataFrame field normalization (PHOENIX-2196)") {
-    val rdd1 = sc
+    val rdd1 = spark.sparkContext
       .parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two")))
       .map(p => Row(p._1, p._2, p._3))
 
-    val sqlContext = new SQLContext(sc)
-
     val schema = StructType(Seq(
       StructField("id", LongType, nullable = false),
       StructField("table1_id", LongType, nullable = true),
       StructField("\"t2col1\"", StringType, nullable = true)
     ))
 
-    val df = sqlContext.createDataFrame(rdd1, schema)
+    val df = spark.sqlContext.createDataFrame(rdd1, schema)
 
-    df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress))
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "TABLE2", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
   }
 
   test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
-      "zkUrl" -> quorumAddress))
-
+    val df = spark.sqlContext.read.format("phoenix").options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     // Prefix match
     val res1 = df.filter("COL1 like 'test_row_%'")
+    val plan = res1.groupBy().count().queryExecution.sparkPlan
     res1.count() shouldEqual 2
 
     // Suffix match
@@ -463,14 +488,14 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can load decimal types with accurate precision and scale (PHOENIX-2288)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_DECIMAL", "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "TEST_DECIMAL", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     assert(df.select("COL1").first().getDecimal(0) == BigDecimal("123.456789").bigDecimal)
   }
 
-  test("Can load small and tiny integeger types (PHOENIX-2426)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TEST_SMALL_TINY", "zkUrl" -> quorumAddress))
+  test("Can load small and tiny integer types (PHOENIX-2426)") {
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> "TEST_SMALL_TINY", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress)).load()
     assert(df.select("COL1").first().getShort(0).toInt == 32767)
     assert(df.select("COL2").first().getByte(0).toInt == 127)
   }
@@ -478,21 +503,19 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   test("Can save arrays from custom dataframes back to phoenix") {
     val dataSet = List(Row(2L, Array("String1", "String2", "String3"), Array(1, 2, 3)))
 
-    val sqlContext = new SQLContext(sc)
-
     val schema = StructType(
       Seq(StructField("ID", LongType, nullable = false),
         StructField("VCARRAY", ArrayType(StringType)),
         StructField("INTARRAY", ArrayType(IntegerType))))
 
-    val rowRDD = sc.parallelize(dataSet)
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
 
     // Apply the schema to the RDD.
-    val df = sqlContext.createDataFrame(rowRDD, schema)
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
 
     df.write
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", "zkUrl" -> quorumAddress))
+      .format("phoenix")
+      .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .mode(SaveMode.Overwrite)
       .save()
 
@@ -509,15 +532,23 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can save arrays of AnyVal type back to phoenix") {
-    val dataSet = List((2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
+    val dataSet = List(Row(2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("INTARRAY", ArrayType(IntegerType)),
+        StructField("BIGINTARRAY", ArrayType(LongType))))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "ARRAY_ANYVAL_TEST_TABLE",
-        Seq("ID", "INTARRAY", "BIGINTARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "ARRAY_ANYVAL_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -527,20 +558,27 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     val longArray = rs.getArray(2).getArray().asInstanceOf[Array[Long]]
 
     // Verify the arrays are equal
-    intArray shouldEqual dataSet(0)._2
-    longArray shouldEqual dataSet(0)._3
+    intArray shouldEqual dataSet(0).get(1)
+    longArray shouldEqual dataSet(0).get(2)
   }
 
   test("Can save arrays of Byte type back to phoenix") {
-    val dataSet = List((2L, Array(1.toByte, 2.toByte, 3.toByte)))
+    val dataSet = List(Row(2L, Array(1.toByte, 2.toByte, 3.toByte)))
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "ARRAY_BYTE_TEST_TABLE",
-        Seq("ID", "BYTEARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("BYTEARRAY", ArrayType(ByteType))))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "ARRAY_BYTE_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -549,19 +587,28 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     val byteArray = rs.getArray(1).getArray().asInstanceOf[Array[Byte]]
 
     // Verify the arrays are equal
-    byteArray shouldEqual dataSet(0)._2
+    byteArray shouldEqual dataSet(0).get(1)
   }
 
   test("Can save binary types back to phoenix") {
-    val dataSet = List((2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
+    val dataSet = List(Row(2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, false),
+        StructField("BIN", BinaryType),
+        StructField("VARBIN", BinaryType),
+        StructField("BINARRAY", ArrayType(BinaryType))))
+
+    val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
 
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "VARBINARY_TEST_TABLE",
-        Seq("ID", "BIN", "VARBIN", "BINARRAY"),
-        zkUrl = Some(quorumAddress)
-      )
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "VARBINARY_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
 
     // Load the results back
     val stmt = conn.createStatement()
@@ -572,16 +619,15 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     val varByteArray = rs.getArray("BINARRAY").getArray().asInstanceOf[Array[Array[Byte]]]
 
     // Verify the arrays are equal
-    byte shouldEqual dataSet(0)._2
-    varByte shouldEqual dataSet(0)._3
-    varByteArray shouldEqual dataSet(0)._4
+    byte shouldEqual dataSet(0).get(1)
+    varByte shouldEqual dataSet(0).get(2)
+    varByteArray shouldEqual dataSet(0).get(3)
   }
 
   test("Can load Phoenix DATE columns through DataFrame API") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.read
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .load
     val dt = df.select("COL1").first().getDate(0).getTime
     val epoch = new Date().getTime
@@ -595,37 +641,37 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Filter operation doesn't work for column names containing a white space (PHOENIX-2547)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("space"),
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> SchemaUtil.getEscapedArgument("space"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
     val res = df.filter(df.col("first name").equalTo("xyz"))
     // Make sure we got the right value back
     assert(res.collectAsList().size() == 1L)
   }
 
   test("Spark Phoenix cannot recognize Phoenix view fields (PHOENIX-2290)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
-      "zkUrl" -> quorumAddress))
-    df.registerTempTable("temp")
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
+    df.createOrReplaceTempView("temp")
 
     // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
     // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
 
-    val res1 = sqlContext.sql("select * from temp where salary = '10000' ")
+    val res1 = spark.sql("select * from temp where salary = '10000' ")
     assert(res1.collectAsList().size() == 1L)
 
-    val res2 = sqlContext.sql("select * from temp where \"salary\" = '10000' ")
+    val res2 = spark.sql("select * from temp where \"salary\" = '10000' ")
     assert(res2.collectAsList().size() == 0L)
 
-    val res3 = sqlContext.sql("select * from temp where salary > '10000' ")
+    val res3 = spark.sql("select * from temp where salary > '10000' ")
     assert(res3.collectAsList().size() == 2L)
   }
 
   test("Queries with small case column-names return empty result-set when working with Spark Datasource Plugin (PHOENIX-2336)") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> SchemaUtil.getEscapedArgument("small"),
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read.format("phoenix")
+      .options(Map("table" -> SchemaUtil.getEscapedArgument("small"), PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
 
     // limitation: filter / where expressions are not allowed with "double quotes", instead of that pass it as column expressions
     // reason: if the expression contains "double quotes" then spark sql parser, ignoring evaluating .. giving to next level to handle
@@ -644,10 +690,9 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.read
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress, "dateAsTimestamp" -> "true"))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "DATE_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, "dateAsTimestamp" -> "true"))
       .load
     val dtRes = df.select("COL1").first()
     val ts = dtRes.getTimestamp(0).getTime
@@ -657,10 +702,9 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("Can load Phoenix Time columns through DataFrame API") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.read
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "TIME_TEST", "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "TIME_TEST", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .load
     val time = df.select("COL1").first().getTimestamp(0).getTime
     val epoch = new Date().getTime
@@ -668,13 +712,14 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
   }
 
   test("can read all Phoenix data types") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "GIGANTIC_TABLE",
-      "zkUrl" -> quorumAddress))
+    val df = spark.sqlContext.read
+      .format("phoenix")
+      .options(Map("table" -> "GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
+      .load
 
     df.write
-      .format("org.apache.phoenix.spark")
-      .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", "zkUrl" -> quorumAddress))
+      .format("phoenix")
+      .options(Map("table" -> "OUTPUT_GIGANTIC_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress))
       .mode(SaveMode.Overwrite)
       .save()
 
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
index 77b41af4e6..291ea2a34d 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
@@ -64,8 +64,7 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT {
   /*****************/
 
   test("Can read from tenant-specific table as DataFrame") {
-    val sqlContext = new SQLContext(sc)
-    val df = sqlContext.phoenixTableAsDataFrame(
+    val df = spark.sqlContext.phoenixTableAsDataFrame(
       TenantTable,
       Seq(OrgIdCol, TenantOnlyCol),
       zkUrl = Some(quorumAddress),
@@ -78,7 +77,7 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT {
   }
 
   test("Can read from tenant-specific table as RDD") {
-    val rdd = sc.phoenixTableAsRDD(
+    val rdd = spark.sparkContext.phoenixTableAsRDD(
       TenantTable,
       Seq(OrgIdCol, TenantOnlyCol),
       zkUrl = Some(quorumAddress),
@@ -95,23 +94,23 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT {
   /*****************/
 
   test("Can write a DataFrame using 'DataFrame.saveToPhoenix' to tenant-specific view") {
-    val sqlContext = new SQLContext(sc)
+    val sqlContext = spark.sqlContext
     import sqlContext.implicits._
 
-    val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
+    val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
     df.saveToPhoenix(TenantTable, zkUrl = Some(quorumAddress), tenantId = Some(TenantId))
 
     verifyResults
   }
 
   test("Can write a DataFrame using 'DataFrame.write' to tenant-specific view") {
-    val sqlContext = new SQLContext(sc)
+    val sqlContext = spark.sqlContext
     import sqlContext.implicits._
 
-    val df = sc.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
+    val df = spark.sparkContext.parallelize(TestDataSet).toDF(OrgIdCol, TenantOnlyCol)
 
     df.write
-      .format("org.apache.phoenix.spark")
+      .format("phoenix")
       .mode("overwrite")
       .option("table", TenantTable)
       .option(PhoenixRuntime.TENANT_ID_ATTRIB, TenantId)
@@ -122,8 +121,7 @@ class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT {
   }
 
   test("Can write an RDD to Phoenix tenant-specific view") {
-    val sqlContext = new SQLContext(sc)
-    sc
+    spark.sparkContext
       .parallelize(TestDataSet)
       .saveToPhoenix(
         TenantTable,
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
new file mode 100644
index 0000000000..ad79d1c848
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import java.util.Optional;
+
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Implements the DataSourceV2 api to read and write from Phoenix tables
+ */
+public class PhoenixDataSource  implements DataSourceV2,  ReadSupport, WriteSupport, DataSourceRegister {
+
+    public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
+    public static final String ZOOKEEPER_URL = "zkUrl";
+
+    @Override
+    public DataSourceReader createReader(DataSourceOptions options) {
+        return new PhoenixDataSourceReader(options);
+    }
+
+    @Override
+    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode,
+            DataSourceOptions options) {
+        if (!mode.equals(SaveMode.Overwrite)) {
+            throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
+        }
+        if (!options.tableName().isPresent()) {
+            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+        }
+        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
+            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
+        }
+
+        PhoenixDataSourceWriteOptions writeOptions = createPhoenixDataSourceWriteOptions(options, schema);
+        return Optional.of(new PhoenixDatasourceWriter(writeOptions));
+    }
+
+    private PhoenixDataSourceWriteOptions createPhoenixDataSourceWriteOptions(DataSourceOptions options,
+            StructType schema) {
+        String scn = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
+        String tenantId = options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
+        String zkUrl = options.get(ZOOKEEPER_URL).get();
+        boolean skipNormalizingIdentifier = options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
+        return new PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
+                .setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
+                .setSkipNormalizingIdentifier(skipNormalizingIdentifier).build();
+    }
+
+    @Override
+    public String shortName() {
+        return "phoenix";
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
new file mode 100644
index 0000000000..8c2fdb16dd
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import java.io.Serializable;
+
+public class PhoenixDataSourceReadOptions implements Serializable {
+
+    private final String tenantId;
+    private final String zkUrl;
+    private final String scn;
+    private final String selectStatement;
+
+    public PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId, String selectStatement) {
+        this.zkUrl = zkUrl;
+        this.scn = scn;
+        this.tenantId = tenantId;
+        this.selectStatement = selectStatement;
+    }
+
+    public String getSelectStatement() {
+        return selectStatement;
+    }
+
+    public String getScn() {
+        return scn;
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
new file mode 100644
index 0000000000..446d96fe2e
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.spark.FilterExpressionCompiler;
+import org.apache.phoenix.spark.SparkSchemaUtil;
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.types.StructType;
+import scala.Tuple3;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
+        SupportsPushDownRequiredColumns {
+
+    private final DataSourceOptions options;
+    private final String tableName;
+    private final String zkUrl;
+    private final boolean dateAsTimestamp;
+
+    private StructType schema;
+    private Filter[] pushedFilters = new Filter[]{};
+    // derived from pushedFilters
+    private String whereClause;
+
+    public PhoenixDataSourceReader(DataSourceOptions options) {
+        if (!options.tableName().isPresent()) {
+            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
+        }
+        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
+            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
+        }
+        this.options = options;
+        this.tableName = options.tableName().get();
+        this.zkUrl = options.get("zkUrl").get();
+        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        setSchema();
+    }
+
+    /**
+     * Sets the schema using all the table columns before any column pruning has been done
+     */
+    private void setSchema() {
+        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl)) {
+            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
+            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
+            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+        }
+        catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public StructType readSchema() {
+        return schema;
+    }
+
+    @Override
+    public Filter[] pushFilters(Filter[] filters) {
+        Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
+        whereClause = tuple3._1();
+        pushedFilters = tuple3._3();
+        return tuple3._2();
+    }
+
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
+        Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        boolean splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        Properties overridingProps = new Properties();
+        if(currentScnValue.isPresent()) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
+        }
+        if (tenantId.isPresent()){
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
+        }
+        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, Lists.newArrayList(schema.names()));
+            final Statement statement = conn.createStatement();
+            final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
+            Preconditions.checkNotNull(selectStatement);
+
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+
+            // setting the snapshot configuration
+            Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+            if (snapshotName.isPresent())
+                PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
+                        getQueryServices().getConfiguration(), snapshotName.get());
+
+            // Initialize the query plan so it sets up the parallel scans
+            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+
+            List<KeyRange> allSplits = queryPlan.getSplits();
+            // Get the RegionSizeCalculator
+            PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+            org.apache.hadoop.hbase.client.Connection connection =
+                    phxConn.getQueryServices().getAdmin().getConnection();
+            RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(queryPlan
+                    .getTableRef().getTable().getPhysicalName().toString()));
+            RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
+                    .getAdmin());
+
+            final List<InputPartition<InternalRow>> partitions = Lists.newArrayListWithExpectedSize(allSplits.size());
+            for (List<Scan> scans : queryPlan.getScans()) {
+                // Get the region location
+                HRegionLocation location = regionLocator.getRegionLocation(
+                        scans.get(0).getStartRow(),
+                        false
+                );
+
+                String regionLocation = location.getHostname();
+
+                // Get the region size
+                long regionSize = sizeCalculator.getRegionSize(
+                        location.getRegionInfo().getRegionName()
+                );
+
+                PhoenixDataSourceReadOptions phoenixDataSourceOptions = new PhoenixDataSourceReadOptions(zkUrl,
+                        currentScnValue.orElse(null), tenantId.orElse(null), selectStatement);
+                if (splitByStats) {
+                    for (Scan aScan : scans) {
+                        partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                                new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation)));
+                    }
+                } else {
+                    partitions.add(new PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                            new PhoenixInputSplit(scans, regionSize, regionLocation)));
+                }
+            }
+            return partitions;
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to plan query", e);
+        }
+    }
+
+    @Override
+    public Filter[] pushedFilters() {
+        return pushedFilters;
+    }
+
+    @Override
+    public void pruneColumns(StructType schema) {
+        this.schema = schema;
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
new file mode 100644
index 0000000000..624ff0fd55
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+public class PhoenixInputPartition implements InputPartition<InternalRow> {
+
+    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private StructType schema;
+    private PhoenixDataSourceReadOptions options;
+
+    public PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType schema, PhoenixInputSplit phoenixInputSplit) {
+        this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
+        this.schema = schema;
+        this.options = options;
+    }
+
+    @Override
+    public InputPartitionReader<InternalRow> createPartitionReader() {
+        return new PhoenixInputPartitionReader(options, schema, phoenixInputSplit);
+    }
+
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
new file mode 100644
index 0000000000..30e84dbd76
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.reader;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.types.StructType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import scala.collection.Iterator;
+
+public class PhoenixInputPartitionReader implements InputPartitionReader<InternalRow>  {
+
+    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private StructType schema;
+    private Iterator<InternalRow> iterator;
+    private PhoenixResultSet resultSet;
+    private InternalRow currentRow;
+    private PhoenixDataSourceReadOptions options;
+
+    public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+        this.options = options;
+        this.phoenixInputSplit = phoenixInputSplit;
+        this.schema = schema;
+        initialize();
+    }
+
+    private QueryPlan getQueryPlan() throws SQLException {
+        String scn = options.getScn();
+        String tenantId = options.getTenantId();
+        String zkUrl = options.getZkUrl();
+        Properties overridingProps = new Properties();
+        if (scn != null) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+        }
+        if (tenantId != null) {
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        try (Connection conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps)) {
+            final Statement statement = conn.createStatement();
+            final String selectStatement = options.getSelectStatement();
+            Preconditions.checkNotNull(selectStatement);
+
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            return queryPlan;
+        }
+    }
+
+    private void initialize() {
+        try {
+            final QueryPlan queryPlan = getQueryPlan();
+            final List<Scan> scans = phoenixInputSplit.value().getScans();
+            List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+            StatementContext ctx = queryPlan.getContext();
+            ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+            String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
+
+            // Clear the table region boundary cache to make sure long running jobs stay up to date
+            byte[] tableNameBytes = queryPlan.getTableRef().getTable().getPhysicalName().getBytes();
+            ConnectionQueryServices services = queryPlan.getContext().getConnection().getQueryServices();
+            services.clearTableRegionCache(tableNameBytes);
+
+            long renewScannerLeaseThreshold = queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            for (Scan scan : scans) {
+                // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
+                scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+
+                PeekingResultIterator peekingResultIterator;
+                ScanMetricsHolder scanMetricsHolder =
+                        ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+                                queryPlan.getContext().getConnection().getLogLevel());
+                final TableResultIterator tableResultIterator =
+                        new TableResultIterator(
+                                queryPlan.getContext().getConnection().getMutationState(), scan,
+                                scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
+                                MapReduceParallelScanGrouper.getInstance());
+                peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+                iterators.add(peekingResultIterator);
+            }
+            ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
+            if (queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
+                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
+            }
+            // Clone the row projector as it's not thread safe and would be used simultaneously by
+            // multiple threads otherwise.
+            this.resultSet = new PhoenixResultSet(iterator, queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
+            this.iterator = SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new InputMetrics());
+        }
+        catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean next() {
+        if (!iterator.hasNext()) {
+            return false;
+        }
+        currentRow = iterator.next();
+        return true;
+    }
+
+    @Override
+    public InternalRow get() {
+        return currentRow;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if(resultSet != null) {
+            try {
+                resultSet.close();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
new file mode 100644
index 0000000000..781d1c83dc
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.types.StructType;
+
+import java.io.Serializable;
+
+public class PhoenixDataSourceWriteOptions implements Serializable {
+
+    private final String tableName;
+    private final String zkUrl;
+    private final String tenantId;
+    private final String scn;
+    private final StructType schema;
+    private final boolean skipNormalizingIdentifier;
+
+    private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, String scn, String tenantId,
+                                          StructType schema, boolean skipNormalizingIdentifier) {
+        this.tableName = tableName;
+        this.zkUrl = zkUrl;
+        this.scn = scn;
+        this.tenantId = tenantId;
+        this.schema = schema;
+        this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+    }
+
+    public String getScn() {
+        return scn;
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    public StructType getSchema() {
+        return schema;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public boolean skipNormalizingIdentifier() {
+        return skipNormalizingIdentifier;
+    }
+
+    public static class Builder {
+        private String tableName;
+        private String zkUrl;
+        private String scn;
+        private String tenantId;
+        private StructType schema;
+        private boolean skipNormalizingIdentifier;
+
+        public Builder setTableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Builder setZkUrl(String zkUrl) {
+            this.zkUrl = zkUrl;
+            return this;
+        }
+
+        public Builder setScn(String scn) {
+            this.scn = scn;
+            return this;
+        }
+
+        public Builder setTenantId(String tenantId) {
+            this.tenantId = tenantId;
+            return this;
+        }
+
+        public Builder setSchema(StructType schema) {
+            this.schema = schema;
+            return this;
+        }
+
+        public Builder setSkipNormalizingIdentifier(boolean skipNormalizingIdentifier) {
+            this.skipNormalizingIdentifier = skipNormalizingIdentifier;
+            return this;
+        }
+
+        public PhoenixDataSourceWriteOptions build() {
+            return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, tenantId, schema, skipNormalizingIdentifier);
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
new file mode 100644
index 0000000000..cf42aa5757
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -0,0 +1,100 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
+import org.apache.spark.sql.execution.datasources.jdbc.PhoenixJdbcDialect$;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import com.google.common.collect.Lists;
+
+public class PhoenixDataWriter implements DataWriter<InternalRow> {
+
+    private final StructType schema;
+    private final Connection conn;
+    private final PreparedStatement statement;
+
+    public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
+        String scn = options.getScn();
+        String tenantId = options.getTenantId();
+        String zkUrl = options.getZkUrl();
+        Properties overridingProps = new Properties();
+        if (scn != null) {
+            overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+        }
+        if (tenantId != null) {
+            overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        this.schema = options.getSchema();
+        try {
+            this.conn = DriverManager.getConnection("jdbc:phoenix:" + zkUrl, overridingProps);
+            List<String> colNames = Lists.newArrayList(options.getSchema().names());
+            if (!options.skipNormalizingIdentifier()){
+                colNames = colNames.stream().map(colName -> SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
+            }
+            String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
+            this.statement = this.conn.prepareStatement(upsertSql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void write(InternalRow internalRow) throws IOException {
+        try {
+            int i=0;
+            for (StructField field : schema.fields()) {
+                DataType dataType = field.dataType();
+                if (internalRow.isNullAt(i)) {
+                    statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType,
+                            PhoenixJdbcDialect$.MODULE$).jdbcNullType());
+                } else {
+                    Row row = SparkJdbcUtil.toRow(schema, internalRow);
+                    SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
+                }
+                ++i;
+            }
+            statement.execute();
+        } catch (SQLException e) {
+            throw new IOException("Exception while executing Phoenix prepared statement", e);
+        }
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+        try {
+            conn.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                statement.close();
+                conn.close();
+            }
+            catch (SQLException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void abort() throws IOException {
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
new file mode 100644
index 0000000000..751fdfa721
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
@@ -0,0 +1,19 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+
+public class PhoenixDataWriterFactory implements DataWriterFactory<InternalRow> {
+
+    private final PhoenixDataSourceWriteOptions options;
+
+    public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+        return new PhoenixDataWriter(options);
+    }
+}
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
new file mode 100644
index 0000000000..7847609737
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -0,0 +1,34 @@
+package org.apache.phoenix.spark.datasource.v2.writer;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+public class PhoenixDatasourceWriter implements DataSourceWriter {
+
+    private final PhoenixDataSourceWriteOptions options;
+
+    public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public DataWriterFactory<InternalRow> createWriterFactory() {
+        return new PhoenixDataWriterFactory(options);
+    }
+
+    @Override
+    public boolean useCommitCoordinator() {
+        return false;
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+    }
+}
diff --git a/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000000..6eff1af315
--- /dev/null
+++ b/phoenix-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
\ No newline at end of file
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index ca476e7ffb..d555954b9e 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -21,6 +21,7 @@ import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
 
 import scala.collection.JavaConversions._
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 object ConfigurationUtil extends Serializable {
 
   def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], tenantId: Option[String] = None, conf: Option[Configuration] = None): Configuration = {
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index be4a32bc39..3b0289d28f 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.DataFrame
 
 import scala.collection.JavaConversions._
 
-
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class DataFrameFunctions(data: DataFrame) extends Serializable {
   def saveToPhoenix(parameters: Map[String, String]): Unit = {
   		saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), 
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index e000b74da2..ccdf595652 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -20,6 +20,7 @@ package org.apache.phoenix.spark
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class DefaultSource extends RelationProvider with CreatableRelationProvider {
 
   // Override 'RelationProvider.createRelation', this enables DataFrame.load()
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
new file mode 100644
index 0000000000..74ff67e348
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.Timestamp
+import java.text.Format
+
+import org.apache.phoenix.util.{DateUtil, SchemaUtil}
+import org.apache.phoenix.util.StringUtil.escapeStringConstant
+import org.apache.spark.sql.sources._
+
+class FilterExpressionCompiler() {
+
+  val timeformatter:Format = DateUtil.getTimestampFormatter(DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID)
+
+  /**
+    * Attempt to create Phoenix-accepted WHERE clause from Spark filters,
+    * mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
+    *
+    * @return tuple representing where clause (derived from supported filters),
+    *         array of unsupported filters and array of supported filters
+    */
+  def pushFilters(filters: Array[Filter]): (String, Array[Filter], Array[Filter]) = {
+    if (filters.isEmpty) {
+      return ("" , Array[Filter](), Array[Filter]())
+    }
+
+    val filter = new StringBuilder("")
+    val unsupportedFilters = Array[Filter]();
+    var i = 0
+
+    filters.foreach(f => {
+      // Assume conjunction for multiple filters, unless otherwise specified
+      if (i > 0) {
+        filter.append(" AND")
+      }
+
+      f match {
+        // Spark 1.3.1+ supported filters
+        case And(leftFilter, rightFilter)  => {
+          val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(leftFilter, rightFilter))
+          if (currUnsupportedFilters.isEmpty)
+            filter.append(whereClause)
+          else
+            unsupportedFilters :+ f
+        }
+        case Or(leftFilter, rightFilter) => {
+          val(whereLeftClause, leftUnsupportedFilters, _) = pushFilters(Array(leftFilter))
+          val(whereRightClause, rightUnsupportedFilters, _) = pushFilters(Array(rightFilter))
+          if (leftUnsupportedFilters.isEmpty && rightUnsupportedFilters.isEmpty) {
+            filter.append(whereLeftClause + " OR " + whereRightClause)
+          }
+          else {
+            unsupportedFilters :+ f
+          }
+        }
+        case Not(aFilter) => {
+          val(whereClause, currUnsupportedFilters, _) = pushFilters(Array(aFilter))
+          if (currUnsupportedFilters.isEmpty)
+            filter.append(" NOT " + whereClause)
+          else
+            unsupportedFilters :+ f
+        }
+        case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
+        case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
+        case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
+        case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
+        case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
+        case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
+        case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
+        case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
+        case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
+        case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
+        case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
+        case _ => unsupportedFilters :+ f
+      }
+
+      i = i + 1
+    })
+
+    (filter.toString(), unsupportedFilters, filters diff unsupportedFilters)
+  }
+
+  // Helper function to escape string values in SQL queries
+  private def compileValue(value: Any): Any = value match {
+    case stringValue: String => s"'${escapeStringConstant(stringValue)}'"
+
+    case timestampValue: Timestamp => getTimestampString(timestampValue)
+
+    // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions
+    // Spark 1.4
+    case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
+    // Spark 1.5
+    case utf if (isClass(utf, "org.apache.spark.unsafe.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
+
+    // Pass through anything else
+    case _ => value
+  }
+
+  private def getTimestampString(timestampValue: Timestamp): String = {
+    "TO_TIMESTAMP('%s', '%s', '%s')".format(timeformatter.format(timestampValue),
+      DateUtil.DEFAULT_TIME_FORMAT, DateUtil.DEFAULT_TIME_ZONE_ID)
+  }
+
+  // Helper function to escape column key to work with SQL queries
+  private def escapeKey(key: String): String = SchemaUtil.getEscapedFullColumnName(key)
+
+  private def isClass(obj: Any, className: String) = {
+    className.equals(obj.getClass().getName())
+  }
+}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index d604e0e0fe..7331a5fd02 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -21,9 +21,6 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.jdbc.PhoenixDriver
 import org.apache.phoenix.mapreduce.PhoenixInputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.query.QueryConstants
-import org.apache.phoenix.schema.types._
-import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
@@ -32,6 +29,7 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 import scala.collection.JavaConverters._
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
                  predicate: Option[String] = None,
                  zkUrl: Option[String] = None,
@@ -126,7 +124,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
 
 
     // Lookup the Spark catalyst types from the Phoenix schema
-    val structFields = phoenixSchemaToCatalystSchema(columnInfoList).toArray
+    val structType = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoList, dateAsTimestamp)
 
     // Create the data frame from the converted Spark schema
     sqlContext.createDataFrame(map(pr => {
@@ -146,60 +144,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
 
       // Create a Spark Row from the sequence
       Row.fromSeq(rowSeq)
-    }), new StructType(structFields))
+    }), structType)
   }
 
-  def normalizeColumnName(columnName: String) = {
-    val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
-    var normalizedColumnName = ""
-    if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
-      normalizedColumnName = unescapedColumnName
-    }
-    else {
-      // split by separator to get the column family and column name
-      val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
-      normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
-    }
-    normalizedColumnName
-  }
-
-  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = columnList.map(ci => {
-    val structType = phoenixTypeToCatalystType(ci)
-    StructField(normalizeColumnName(ci.getColumnName), structType)
-  })
-
-
-  // Lookup table for Phoenix types to Spark catalyst types
-  def phoenixTypeToCatalystType(columnInfo: ColumnInfo): DataType = columnInfo.getPDataType match {
-    case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
-    case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
-    case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
-    case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
-    case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
-    case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
-    case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
-    // Use Spark system default precision for now (explicit to work with < 1.5)
-    case t if t.isInstanceOf[PDecimal] =>
-      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
-    case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
-    case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
-    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
-    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
-    case t if t.isInstanceOf[PBoolean] => BooleanType
-    case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
-    case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
-    case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
-    case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
-    case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
-    case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
-    case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
-    case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
-    case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
-    case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
-    case t if t.isInstanceOf[PDecimalArray] => ArrayType(
-      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
-    case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
-    case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
-    case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
-  }
 }
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index c35cc54575..6d4c4ccf16 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -20,7 +20,7 @@ import org.apache.phoenix.util.ColumnInfo
 import org.joda.time.DateTime
 import scala.collection.{mutable, immutable}
 
-
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class PhoenixRecordWritable(columnMetaDataList: List[ColumnInfo]) extends DBWritable {
   val upsertValues = mutable.ArrayBuffer[Any]()
   val resultMap = mutable.Map[String, AnyRef]()
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 38bf29a923..2f6ea8cee7 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -19,12 +19,11 @@ package org.apache.phoenix.spark
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.phoenix.util.StringUtil.escapeStringConstant
-import org.apache.phoenix.util.SchemaUtil
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext)
     extends BaseRelation with PrunedFilteredScan {
 
@@ -36,7 +35,7 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo
     but this prevents having to load the whole table into Spark first.
   */
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    val(pushedFilters, unhandledFilters) = buildFilter(filters)
+    val(pushedFilters, _, _) = new FilterExpressionCompiler().pushFilters(filters)
     new PhoenixRDD(
       sqlContext.sparkContext,
       tableName,
@@ -61,71 +60,10 @@ case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Bo
     ).toDataFrame(sqlContext).schema
   }
 
-  // Attempt to create Phoenix-accepted WHERE clauses from Spark filters,
-  // mostly inspired from Spark SQL JDBCRDD and the couchbase-spark-connector
-  private def buildFilter(filters: Array[Filter]): (String, Array[Filter]) = {
-    if (filters.isEmpty) {
-      return ("" , Array[Filter]())
-    }
-
-    val filter = new StringBuilder("")
-    val unsupportedFilters = Array[Filter]();
-    var i = 0
-
-    filters.foreach(f => {
-      // Assume conjunction for multiple filters, unless otherwise specified
-      if (i > 0) {
-        filter.append(" AND")
-      }
-
-      f match {
-        // Spark 1.3.1+ supported filters
-        case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter)))
-        case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter)))
-        case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter)))
-        case EqualTo(attr, value) => filter.append(s" ${escapeKey(attr)} = ${compileValue(value)}")
-        case GreaterThan(attr, value) => filter.append(s" ${escapeKey(attr)} > ${compileValue(value)}")
-        case GreaterThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} >= ${compileValue(value)}")
-        case LessThan(attr, value) => filter.append(s" ${escapeKey(attr)} < ${compileValue(value)}")
-        case LessThanOrEqual(attr, value) => filter.append(s" ${escapeKey(attr)} <= ${compileValue(value)}")
-        case IsNull(attr) => filter.append(s" ${escapeKey(attr)} IS NULL")
-        case IsNotNull(attr) => filter.append(s" ${escapeKey(attr)} IS NOT NULL")
-        case In(attr, values) => filter.append(s" ${escapeKey(attr)} IN ${values.map(compileValue).mkString("(", ",", ")")}")
-        case StringStartsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue(value + "%")}")
-        case StringEndsWith(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value)}")
-        case StringContains(attr, value) => filter.append(s" ${escapeKey(attr)} LIKE ${compileValue("%" + value + "%")}")
-        case _ => unsupportedFilters :+ f
-      }
-
-      i = i + 1
-    })
-
-    (filter.toString(), unsupportedFilters)
-  }
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
-    val(pushedFilters, unhandledFilters) = buildFilter(filters)
+    val (_, unhandledFilters, _) = new FilterExpressionCompiler().pushFilters(filters)
     unhandledFilters
   }
 
-  // Helper function to escape column key to work with SQL queries
-  private def escapeKey(key: String): String = SchemaUtil.getEscapedArgument(key)
-
-  // Helper function to escape string values in SQL queries
-  private def compileValue(value: Any): Any = value match {
-    case stringValue: String => s"'${escapeStringConstant(stringValue)}'"
-
-    // Borrowed from 'elasticsearch-hadoop', support these internal UTF types across Spark versions
-    // Spark 1.4
-    case utf if (isClass(utf, "org.apache.spark.sql.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
-    // Spark 1.5
-    case utf if (isClass(utf, "org.apache.spark.unsafe.types.UTF8String")) => s"'${escapeStringConstant(utf.toString)}'"
-
-    // Pass through anything else
-    case _ => value
-  }
-
-  private def isClass(obj: Any, className: String) = {
-    className.equals(obj.getClass().getName())
-  }
 }
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index 1b33e6e35d..b0735218c6 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
 
 import scala.collection.JavaConversions._
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Serializable {
 
   def saveToPhoenix(tableName: String, cols: Seq[String],
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
index 476ce8a7fc..1b377abe3b 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkContextFunctions.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class SparkContextFunctions(@transient val sc: SparkContext) extends Serializable {
 
   /*
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
new file mode 100644
index 0000000000..f69e988246
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.phoenix.query.QueryConstants
+import org.apache.phoenix.schema.types._
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
+import org.apache.spark.sql.types._
+
+object SparkSchemaUtil {
+
+  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo], dateAsTimestamp: Boolean = false) : StructType = {
+    val structFields = columnList.map(ci => {
+      val structType = phoenixTypeToCatalystType(ci, dateAsTimestamp)
+      StructField(normalizeColumnName(ci.getColumnName), structType)
+    })
+    new StructType(structFields.toArray)
+  }
+
+  def normalizeColumnName(columnName: String) = {
+    val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
+    var normalizedColumnName = ""
+    if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
+      normalizedColumnName = unescapedColumnName
+    }
+    else {
+      // split by separator to get the column family and column name
+      val tokens = unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
+      normalizedColumnName = if (tokens(0) == "0") tokens(1) else unescapedColumnName
+    }
+    normalizedColumnName
+  }
+
+
+  // Lookup table for Phoenix types to Spark catalyst types
+  def phoenixTypeToCatalystType(columnInfo: ColumnInfo, dateAsTimestamp: Boolean): DataType = columnInfo.getPDataType match {
+    case t if t.isInstanceOf[PVarchar] || t.isInstanceOf[PChar] => StringType
+    case t if t.isInstanceOf[PLong] || t.isInstanceOf[PUnsignedLong] => LongType
+    case t if t.isInstanceOf[PInteger] || t.isInstanceOf[PUnsignedInt] => IntegerType
+    case t if t.isInstanceOf[PSmallint] || t.isInstanceOf[PUnsignedSmallint] => ShortType
+    case t if t.isInstanceOf[PTinyint] || t.isInstanceOf[PUnsignedTinyint] => ByteType
+    case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => FloatType
+    case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => DoubleType
+    // Use Spark system default precision for now (explicit to work with < 1.5)
+    case t if t.isInstanceOf[PDecimal] =>
+      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
+    case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
+    case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
+    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
+    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
+    case t if t.isInstanceOf[PBoolean] => BooleanType
+    case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
+    case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
+    case t if t.isInstanceOf[PBooleanArray] => ArrayType(BooleanType, containsNull = true)
+    case t if t.isInstanceOf[PVarcharArray] || t.isInstanceOf[PCharArray] => ArrayType(StringType, containsNull = true)
+    case t if t.isInstanceOf[PVarbinaryArray] || t.isInstanceOf[PBinaryArray] => ArrayType(BinaryType, containsNull = true)
+    case t if t.isInstanceOf[PLongArray] || t.isInstanceOf[PUnsignedLongArray] => ArrayType(LongType, containsNull = true)
+    case t if t.isInstanceOf[PSmallintArray] || t.isInstanceOf[PUnsignedSmallintArray] => ArrayType(IntegerType, containsNull = true)
+    case t if t.isInstanceOf[PTinyintArray] || t.isInstanceOf[PUnsignedTinyintArray] => ArrayType(ByteType, containsNull = true)
+    case t if t.isInstanceOf[PFloatArray] || t.isInstanceOf[PUnsignedFloatArray] => ArrayType(FloatType, containsNull = true)
+    case t if t.isInstanceOf[PDoubleArray] || t.isInstanceOf[PUnsignedDoubleArray] => ArrayType(DoubleType, containsNull = true)
+    case t if t.isInstanceOf[PDecimalArray] => ArrayType(
+      if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale), containsNull = true)
+    case t if t.isInstanceOf[PTimestampArray] || t.isInstanceOf[PUnsignedTimestampArray] => ArrayType(TimestampType, containsNull = true)
+    case t if t.isInstanceOf[PDateArray] || t.isInstanceOf[PUnsignedDateArray] => ArrayType(TimestampType, containsNull = true)
+    case t if t.isInstanceOf[PTimeArray] || t.isInstanceOf[PUnsignedTimeArray] => ArrayType(TimestampType, containsNull = true)
+  }
+
+}
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
index a0842c9a31..f9154adf86 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSqlContextFunctions.scala
@@ -16,6 +16,7 @@ package org.apache.phoenix.spark
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.{DataFrame, SQLContext}
 
+@deprecated("Use the DataSource V2 API implementation (see PhoenixDataSource)")
 class SparkSqlContextFunctions(@transient val sqlContext: SQLContext) extends Serializable {
 
   /*
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
new file mode 100644
index 0000000000..712ec2d840
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
+import org.apache.spark.sql.types._
+
+private object PhoenixJdbcDialect  extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:phoenix")
+
+  /**
+    * This is only called for ArrayType (see JdbcUtils.makeSetter)
+    */
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case StringType => Some(JdbcType("VARCHAR", java.sql.Types.VARCHAR))
+    case BinaryType => Some(JdbcType("BINARY(" + dt.defaultSize + ")", java.sql.Types.BINARY))
+    case ByteType => Some(JdbcType("TINYINT", java.sql.Types.TINYINT))
+    case _ => None
+  }
+
+
+}
\ No newline at end of file
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
new file mode 100644
index 0000000000..eac483ae8d
--- /dev/null
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.sql.{Connection, PreparedStatement, ResultSet}
+import java.util.Locale
+
+import org.apache.spark.executor.InputMetrics
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
+import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.NextIterator
+
+object SparkJdbcUtil {
+
+  def toRow(schema: StructType, internalRow: InternalRow) : Row = {
+    val encoder = RowEncoder(schema).resolveAndBind()
+    encoder.fromRow(internalRow)
+  }
+
+  // A `JDBCValueGetter` is responsible for getting a value from `ResultSet` into a field
+  // for `MutableRow`. The last argument `Int` means the index for the value to be set in
+  // the row and also used for the value in `ResultSet`.
+  private type JDBCValueGetter = (ResultSet, InternalRow, Int) => Unit
+
+  private def nullSafeConvert[T](input: T, f: T => Any): Any = {
+    if (input == null) {
+      null
+    } else {
+      f(input)
+    }
+  }
+
+  /**
+    * Creates `JDBCValueGetter`s according to [[StructType]], which can set
+    * each value from `ResultSet` to each field of [[InternalRow]] correctly.
+    */
+  private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
+    schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
+
+  private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
+    case BooleanType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setBoolean(pos, rs.getBoolean(pos + 1))
+
+    case DateType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
+        val dateVal = rs.getDate(pos + 1)
+        if (dateVal != null) {
+          row.setInt(pos, DateTimeUtils.fromJavaDate(dateVal))
+        } else {
+          row.update(pos, null)
+        }
+
+    // When connecting with Oracle DB through JDBC, the precision and scale of BigDecimal
+    // object returned by ResultSet.getBigDecimal is not correctly matched to the table
+    // schema reported by ResultSetMetaData.getPrecision and ResultSetMetaData.getScale.
+    // If inserting values like 19999 into a column with NUMBER(12, 2) type, you get through
+    // a BigDecimal object with scale as 0. But the dataframe schema has correct type as
+    // DecimalType(12, 2). Thus, after saving the dataframe into parquet file and then
+    // retrieve it, you will get wrong result 199.99.
+    // So it is needed to set precision and scale for Decimal based on JDBC metadata.
+    case DecimalType.Fixed(p, s) =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val decimal =
+          nullSafeConvert[java.math.BigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
+        row.update(pos, decimal)
+
+    case DoubleType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setDouble(pos, rs.getDouble(pos + 1))
+
+    case FloatType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setFloat(pos, rs.getFloat(pos + 1))
+
+    case IntegerType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setInt(pos, rs.getInt(pos + 1))
+
+    case LongType if metadata.contains("binarylong") =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val bytes = rs.getBytes(pos + 1)
+        var ans = 0L
+        var j = 0
+        while (j < bytes.length) {
+          ans = 256 * ans + (255 & bytes(j))
+          j = j + 1
+        }
+        row.setLong(pos, ans)
+
+    case LongType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setLong(pos, rs.getLong(pos + 1))
+
+    case ShortType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.setShort(pos, rs.getShort(pos + 1))
+
+    case StringType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        // TODO(davies): use getBytes for better performance, if the encoding is UTF-8
+        row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))
+
+    case TimestampType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val t = rs.getTimestamp(pos + 1)
+        if (t != null) {
+          row.setLong(pos, DateTimeUtils.fromJavaTimestamp(t))
+        } else {
+          row.update(pos, null)
+        }
+
+    case BinaryType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.update(pos, rs.getBytes(pos + 1))
+
+    case ByteType =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        row.update(pos, rs.getByte(pos + 1))
+
+    case ArrayType(et, _) =>
+      val elementConversion = et match {
+        case TimestampType =>
+          (array: Object) =>
+            array.asInstanceOf[Array[java.sql.Timestamp]].map { timestamp =>
+              nullSafeConvert(timestamp, DateTimeUtils.fromJavaTimestamp)
+            }
+
+        case StringType =>
+          (array: Object) =>
+            // some underling types are not String such as uuid, inet, cidr, etc.
+            array.asInstanceOf[Array[java.lang.Object]]
+              .map(obj => if (obj == null) null else UTF8String.fromString(obj.toString))
+
+        case DateType =>
+          (array: Object) =>
+            array.asInstanceOf[Array[java.sql.Date]].map { date =>
+              nullSafeConvert(date, DateTimeUtils.fromJavaDate)
+            }
+
+        case dt: DecimalType =>
+          (array: Object) =>
+            array.asInstanceOf[Array[java.math.BigDecimal]].map { decimal =>
+              nullSafeConvert[java.math.BigDecimal](
+                decimal, d => Decimal(d, dt.precision, dt.scale))
+            }
+
+        case LongType if metadata.contains("binarylong") =>
+          throw new IllegalArgumentException(s"Unsupported array element " +
+            s"type ${dt.catalogString} based on binary")
+
+        case ArrayType(_, _) =>
+          throw new IllegalArgumentException("Nested arrays unsupported")
+
+        case _ => (array: Object) => array.asInstanceOf[Array[Any]]
+      }
+
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val array = nullSafeConvert[java.sql.Array](
+          input = rs.getArray(pos + 1),
+          array => new GenericArrayData(elementConversion.apply(array.getArray)))
+        row.update(pos, array)
+
+    case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.catalogString}")
+  }
+
+  // TODO just use JdbcUtils.resultSetToSparkInternalRows in Spark 3.0 (see SPARK-26499)
+  def resultSetToSparkInternalRows(
+                                    resultSet: ResultSet,
+                                    schema: StructType,
+                                    inputMetrics: InputMetrics): Iterator[InternalRow] = {
+    // JdbcUtils.resultSetToSparkInternalRows(resultSet, schema, inputMetrics)
+    new NextIterator[InternalRow] {
+      private[this] val rs = resultSet
+      private[this] val getters: Array[JDBCValueGetter] = makeGetters(schema)
+      private[this] val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
+
+      override protected def close(): Unit = {
+        try {
+          rs.close()
+        } catch {
+          case e: Exception =>
+        }
+      }
+
+      override protected def getNext(): InternalRow = {
+        if (rs.next()) {
+          inputMetrics.incRecordsRead(1)
+          var i = 0
+          while (i < getters.length) {
+            getters(i).apply(rs, mutableRow, i)
+            if (rs.wasNull) mutableRow.setNullAt(i)
+            i = i + 1
+          }
+          mutableRow
+        } else {
+          finished = true
+          null.asInstanceOf[InternalRow]
+        }
+      }
+    }
+  }
+
+  // A `JDBCValueSetter` is responsible for setting a value from `Row` into a field for
+  // `PreparedStatement`. The last argument `Int` means the index for the value to be set
+  // in the SQL statement and also used for the value in `Row`.
+  private type JDBCValueSetter = (PreparedStatement, Row, Int) => Unit
+
+  // take from Spark JdbcUtils.scala, cannot be used directly because the method is private
+  def makeSetter(
+                  conn: Connection,
+                  dialect: JdbcDialect,
+                  dataType: DataType): JDBCValueSetter = dataType match {
+    case IntegerType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setInt(pos + 1, row.getInt(pos))
+
+    case LongType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setLong(pos + 1, row.getLong(pos))
+
+    case DoubleType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setDouble(pos + 1, row.getDouble(pos))
+
+    case FloatType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setFloat(pos + 1, row.getFloat(pos))
+
+    case ShortType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setInt(pos + 1, row.getShort(pos))
+
+    case ByteType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setInt(pos + 1, row.getByte(pos))
+
+    case BooleanType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setBoolean(pos + 1, row.getBoolean(pos))
+
+    case StringType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setString(pos + 1, row.getString(pos))
+
+    case BinaryType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
+
+    case TimestampType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
+
+    case DateType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
+
+    case t: DecimalType =>
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        stmt.setBigDecimal(pos + 1, row.getDecimal(pos))
+
+    case ArrayType(et, _) =>
+      // remove type length parameters from end of type name
+      val typeName = getJdbcType(et, dialect).databaseTypeDefinition
+        .toLowerCase(Locale.ROOT).split("\\(")(0)
+      (stmt: PreparedStatement, row: Row, pos: Int) =>
+        val array = conn.createArrayOf(
+          typeName,
+          row.getSeq[AnyRef](pos).toArray)
+        stmt.setArray(pos + 1, array)
+
+    case _ =>
+      (_: PreparedStatement, _: Row, pos: Int) =>
+        throw new IllegalArgumentException(
+          s"Can't translate non-null value for field $pos")
+  }
+
+  // taken from Spark JdbcUtils
+  def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = {
+    dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse(
+      throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}"))
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index e817200d2c..70b3eb1d97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,7 @@
     <jettyVersion>8.1.7.v20120910</jettyVersion>
     <tephra.version>0.15.0-incubating</tephra.version>
     <omid.version>1.0.0</omid.version>
-    <spark.version>2.3.2</spark.version>
+    <spark.version>2.4.0</spark.version>
     <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
     <stream.version>2.9.5</stream.version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services