You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/27 03:15:21 UTC

svn commit: r1870483 - in /phoenix/site: publish/phoenix_spark.html source/src/site/markdown/phoenix_spark.md

Author: chinmayskulkarni
Date: Wed Nov 27 03:15:21 2019
New Revision: 1870483

URL: http://svn.apache.org/viewvc?rev=1870483&view=rev
Log:
PHOENIX-5585: Add documentation for Phoenix-Spark Java example

Modified:
    phoenix/site/publish/phoenix_spark.html
    phoenix/site/source/src/site/markdown/phoenix_spark.md

Modified: phoenix/site/publish/phoenix_spark.html
URL: http://svn.apache.org/viewvc/phoenix/site/publish/phoenix_spark.html?rev=1870483&r1=1870482&r2=1870483&view=diff
==============================================================================
--- phoenix/site/publish/phoenix_spark.html (original)
+++ phoenix/site/publish/phoenix_spark.html Wed Nov 27 03:15:21 2019
@@ -226,6 +226,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2,
   </div> 
   <div class="section"> 
    <h4 id="Load_as_a_DataFrame_using_the_DataSourceV2_API">Load as a DataFrame using the DataSourceV2 API</h4> 
+   <p>Scala example: </p> 
    <div class="source"> 
     <pre>import org.apache.spark.SparkContext
 import org.apache.spark.sql.{SQLContext, SparkSession}
@@ -249,6 +250,40 @@ df.filter(df(&quot;COL1&quot;) === &quot
   .show
 </pre> 
    </div> 
+   <p>Java example: </p> 
+   <div class="source"> 
+    <pre>import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkRead {
+    
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;phoenix-test&quot;);
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        
+        // Load data from TABLE1
+        Dataset&lt;Row&gt; df = sqlContext
+            .read()
+            .format(&quot;phoenix&quot;)
+            .option(&quot;table&quot;, &quot;TABLE1&quot;)
+            .option(ZOOKEEPER_URL, &quot;phoenix-server:2181&quot;)
+            .load();
+        df.createOrReplaceTempView(&quot;TABLE1&quot;);
+    
+        SQLContext sqlCtx = new SQLContext(jsc);
+        df = sqlCtx.sql(&quot;SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L&quot;);
+        df.show();
+        jsc.stop();
+    }
+}
+</pre> 
+   </div> 
   </div> 
  </div> 
  <div class="section"> 
@@ -263,7 +298,7 @@ df.filter(df(&quot;COL1&quot;) === &quot
 CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 </pre> 
    </div> 
-   <p>you can load from an input table and save to an output table as a DataFrame as follows:</p> 
+   <p>you can load from an input table and save to an output table as a DataFrame as follows in Scala:</p> 
    <div class="source"> 
     <pre>import org.apache.spark.SparkContext
 import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
@@ -279,18 +314,55 @@ val spark = SparkSession
 val df = spark.sqlContext
   .read
   .format(&quot;phoenix&quot;)
-  .options(Map(&quot;table&quot; -&gt; &quot;INPUT_TABLE&quot;, PhoenixDataSource.ZOOKEEPER_URL -&gt; &quot;hbaseConnectionString&quot;))
+  .options(Map(&quot;table&quot; -&gt; &quot;INPUT_TABLE&quot;, PhoenixDataSource.ZOOKEEPER_URL -&gt; &quot;phoenix-server:2181&quot;))
   .load
 
 // Save to OUTPUT_TABLE
-df
-  .write
+df.write
   .format(&quot;phoenix&quot;)
   .mode(SaveMode.Overwrite)
-  .options(Map(&quot;table&quot; -&gt; &quot;INPUT_TABLE&quot;, PhoenixDataSource.ZOOKEEPER_URL -&gt; &quot;hbaseConnectionString&quot;))
+  .options(Map(&quot;table&quot; -&gt; &quot;OUTPUT_TABLE&quot;, PhoenixDataSource.ZOOKEEPER_URL -&gt; &quot;phoenix-server:2181&quot;))
   .save()
 </pre> 
    </div> 
+   <p>Java example: </p> 
+   <div class="source"> 
+    <pre>import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromInputTable {
+    
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;phoenix-test&quot;);
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        
+        // Load INPUT_TABLE
+        Dataset&lt;Row&gt; df = sqlContext
+            .read()
+            .format(&quot;phoenix&quot;)
+            .option(&quot;table&quot;, &quot;INPUT_TABLE&quot;)
+            .option(ZOOKEEPER_URL, &quot;phoenix-server:2181&quot;)
+            .load();
+        
+        // Save to OUTPUT_TABLE
+        df.write()
+          .format(&quot;phoenix&quot;)
+          .mode(SaveMode.Overwrite)
+          .option(&quot;table&quot;, &quot;OUTPUT_TABLE&quot;)
+          .option(ZOOKEEPER_URL, &quot;phoenix-server:2181&quot;)
+          .save();
+        jsc.stop();
+    }
+}
+</pre> 
+   </div> 
   </div> 
   <div class="section"> 
    <h4 id="Save_from_an_external_RDD_with_a_schema_to_a_Phoenix_table">Save from an external RDD with a schema to a Phoenix table</h4> 
@@ -301,7 +373,7 @@ df
     <pre>CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 </pre> 
    </div> 
-   <p>you can save a dataframe from an RDD as follows: </p> 
+   <p>you can save a dataframe from an RDD as follows in Scala: </p> 
    <div class="source"> 
     <pre>import org.apache.spark.SparkContext
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField}
@@ -328,11 +400,66 @@ val df = spark.sqlContext.createDataFram
 
 df.write
   .format(&quot;phoenix&quot;)
-  .options(Map(&quot;table&quot; -&gt; &quot;OUTPUT_TEST_TABLE&quot;, PhoenixDataSource.ZOOKEEPER_URL -&gt; &quot;quorumAddress&quot;))
+  .options(Map(&quot;table&quot; -&gt; &quot;OUTPUT_TABLE&quot;, PhoenixDataSource.ZOOKEEPER_URL -&gt; &quot;phoenix-server:2181&quot;))
   .mode(SaveMode.Overwrite)
   .save()
 </pre> 
    </div> 
+   <p>Java example: </p> 
+   <div class="source"> 
+    <pre>import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromRDDWithSchema {
+ 
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster(&quot;local&quot;).setAppName(&quot;phoenix-test&quot;);
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        SparkSession spark = sqlContext.sparkSession();
+        Dataset&lt;Row&gt; df;
+  
+        // Generate the schema based on the fields
+        List&lt;StructField&gt; fields = new ArrayList&lt;&gt;();
+        fields.add(DataTypes.createStructField(&quot;ID&quot;, DataTypes.LongType, false));
+        fields.add(DataTypes.createStructField(&quot;COL1&quot;, DataTypes.StringType, true));
+        fields.add(DataTypes.createStructField(&quot;COL2&quot;, DataTypes.IntegerType, true));
+        StructType schema = DataTypes.createStructType(fields);
+  
+        // Generate the rows with the same exact schema
+        List&lt;Row&gt; rows = new ArrayList&lt;&gt;();
+        for (int i = 1; i &lt; 4; i++) {
+            rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
+        }
+  
+        // Create a DataFrame from the rows and the specified schema
+        df = spark.createDataFrame(rows, schema);
+        df.write()
+            .format(&quot;phoenix&quot;)
+            .mode(SaveMode.Overwrite)
+            .option(&quot;table&quot;, &quot;OUTPUT_TABLE&quot;)
+            .option(ZOOKEEPER_URL,  &quot;phoenix-server:2181&quot;)
+            .save();
+  
+        jsc.stop();
+    }
+}
+</pre> 
+   </div> 
   </div> 
  </div> 
  <div class="section"> 
@@ -340,12 +467,12 @@ df.write
   <p>With Spark’s DataFrame support, you can also use <tt>pyspark</tt> to read and write from Phoenix tables.</p> 
   <div class="section"> 
    <h4 id="Load_a_DataFrame">Load a DataFrame</h4> 
-   <p>Given a table <i>TABLE1</i> and a Zookeeper url of <tt>localhost:2181</tt> you can load the table as a DataFrame using the following Python code in <tt>pyspark</tt></p> 
+   <p>Given a table <i>TABLE1</i> and a Zookeeper url of <tt>phoenix-server:2181</tt> you can load the table as a DataFrame using the following Python code in <tt>pyspark</tt></p> 
    <div class="source"> 
     <pre>df = sqlContext.read \
   .format(&quot;phoenix&quot;) \
   .option(&quot;table&quot;, &quot;TABLE1&quot;) \
-  .option(&quot;zkUrl&quot;, &quot;localhost:2181&quot;) \
+  .option(&quot;zkUrl&quot;, &quot;phoenix-server:2181&quot;) \
   .load()
 </pre> 
    </div> 
@@ -358,7 +485,7 @@ df.write
   .format(&quot;phoenix&quot;) \
   .mode(&quot;overwrite&quot;) \
   .option(&quot;table&quot;, &quot;TABLE1&quot;) \
-  .option(&quot;zkUrl&quot;, &quot;localhost:2181&quot;) \
+  .option(&quot;zkUrl&quot;, &quot;phoenix-server:2181&quot;) \
   .save()
 </pre> 
    </div> 
@@ -377,7 +504,7 @@ df.write
   .sqlContext
   .read
   .format(&quot;phoenix&quot;)
-  .options(Map(&quot;table&quot; -&gt; &quot;Table1&quot;, &quot;zkUrl&quot; -&gt; &quot;hosta,hostb,hostc&quot;, 
+  .options(Map(&quot;table&quot; -&gt; &quot;Table1&quot;, &quot;zkUrl&quot; -&gt; &quot;phoenix-server:2181&quot;, 
     &quot;phoenixConfigs&quot; -&gt; &quot;hbase.client.retries.number=10,hbase.client.pause=10000&quot;))
   .load;
 </pre> 

Modified: phoenix/site/source/src/site/markdown/phoenix_spark.md
URL: http://svn.apache.org/viewvc/phoenix/site/source/src/site/markdown/phoenix_spark.md?rev=1870483&r1=1870482&r2=1870483&view=diff
==============================================================================
--- phoenix/site/source/src/site/markdown/phoenix_spark.md (original)
+++ phoenix/site/source/src/site/markdown/phoenix_spark.md Wed Nov 27 03:15:21 2019
@@ -1,7 +1,7 @@
 # Apache Spark Plugin
 
 The phoenix-spark plugin extends Phoenix's MapReduce support to allow Spark to load Phoenix tables
-as RDDs or DataFrames, and enables persisting them back to Phoenix.
+as DataFrames, and enables persisting them back to Phoenix.
 
 #### Prerequisites
 
@@ -28,8 +28,10 @@ The choice of which method to use to acc
 for the Spark executors and drivers, set both '_spark.executor.extraClassPath_' and 
 '_spark.driver.extraClassPath_' in spark-defaults.conf to include the 'phoenix-_`<version>`_-client.jar'
    
-* Note that for Phoenix versions 4.7 and 4.8 you must use the 'phoenix-_`<version>`_-client-spark.jar'. As of Phoenix 4.10, the 'phoenix-_`<version>`_-client.jar' is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the `spark16` maven profile.   
-   
+* Note that for Phoenix versions 4.7 and 4.8 you must use the 'phoenix-_`<version>`_-client-spark.jar'. 
+
+* As of Phoenix 4.10, the 'phoenix-_`<version>`_-client.jar' is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the `spark16` maven profile.
+
 * To help your IDE, you can add the following _provided_ dependency to your build:
 
 ```
@@ -41,9 +43,21 @@ for the Spark executors and drivers, set
 </dependency>
 ```
 
+* As of Phoenix 4.15.0, the connectors project will be separated from the main phoenix project (see [phoenix-connectors](https://github.com/apache/phoenix-connectors))
+and will have its own releases. You can add the following dependency in your project:
+
+```
+<dependency>
+  <groupId>org.apache.phoenix</groupId>
+  <artifactId>phoenix-spark</artifactId>
+  <version>${phoenix.connectors.version}</version>
+</dependency>
+```
+The first released connectors jar is `connectors-1.0.0` (replace above `phoenix.connectors.version` with this version)
+
 ### Reading Phoenix Tables
 
-Given a Phoenix table with the following DDL
+Given a Phoenix table with the following DDL and DML:
 
 ```sql
 CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
@@ -51,134 +65,251 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (1,
 UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
 ```
 
-#### Load as a DataFrame using the Data Source API
+#### Load as a DataFrame using the DataSourceV2 API
+Scala example:  
+
 ```scala
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.phoenix.spark._
-
-val sc = new SparkContext("local", "phoenix-test")
-val sqlContext = new SQLContext(sc)
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
 
-val df = sqlContext.load(
-  "org.apache.phoenix.spark",
-  Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
-)
+val spark = SparkSession
+  .builder()
+  .appName("phoenix-test")
+  .master("local")
+  .getOrCreate()
+
+// Load data from TABLE1
+val df = spark.sqlContext
+  .read
+  .format("phoenix")
+  .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+  .load
 
-df
-  .filter(df("COL1") === "test_row_1" && df("ID") === 1L)
+df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
   .select(df("ID"))
   .show
 ```
 
-#### Load as a DataFrame directly using a Configuration object
-```scala
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.phoenix.spark._
-
-val configuration = new Configuration()
-// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
-
-val sc = new SparkContext("local", "phoenix-test")
-val sqlContext = new SQLContext(sc)
-
-// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
-val df = sqlContext.phoenixTableAsDataFrame(
-  "TABLE1", Array("ID", "COL1"), conf = configuration
-)
+Java example:  
 
-df.show
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkRead {
+    
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        
+        // Load data from TABLE1
+        Dataset<Row> df = sqlContext
+            .read()
+            .format("phoenix")
+            .option("table", "TABLE1")
+            .option(ZOOKEEPER_URL, "phoenix-server:2181")
+            .load();
+        df.createOrReplaceTempView("TABLE1");
+    
+        SQLContext sqlCtx = new SQLContext(jsc);
+        df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
+        df.show();
+        jsc.stop();
+    }
+}
 ```
 
-#### Load as an RDD, using a Zookeeper URL
-```scala
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.phoenix.spark._
-
-val sc = new SparkContext("local", "phoenix-test")
-
-// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
-val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
-  "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
-)
+### Saving to Phoenix
 
-rdd.count()
+#### Save DataFrames to Phoenix using DataSourceV2
 
-val firstId = rdd1.first()("ID").asInstanceOf[Long]
-val firstCol = rdd1.first()("COL1").asInstanceOf[String]
-```
+The `save` is method on DataFrame allows passing in a data source type. You can use
+`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to
+specify which table and server to persist the DataFrame to. The column names are derived from
+the DataFrame's schema field names, and must match the Phoenix column names.
 
-### Saving Phoenix
+The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported.
 
-Given a Phoenix table with the following DDL
+Given two Phoenix tables with the following DDL:
 
 ```sql
-CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 ```
-
-#### Saving RDDs
-
-The `saveToPhoenix` method is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
-correspond to one of [the Java types supported by Phoenix](language/datatypes.html).
-
+you can load from an input table and save to an output table as a DataFrame as follows in Scala:
 
 ```scala
 import org.apache.spark.SparkContext
-import org.apache.phoenix.spark._
+import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
 
-val sc = new SparkContext("local", "phoenix-test")
-val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+val spark = SparkSession
+  .builder()
+  .appName("phoenix-test")
+  .master("local")
+  .getOrCreate()
+  
+// Load INPUT_TABLE
+val df = spark.sqlContext
+  .read
+  .format("phoenix")
+  .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+  .load
 
-sc
-  .parallelize(dataSet)
-  .saveToPhoenix(
-    "OUTPUT_TEST_TABLE",
-    Seq("ID","COL1","COL2"),
-    zkUrl = Some("phoenix-server:2181")
-  )
+// Save to OUTPUT_TABLE
+df.write
+  .format("phoenix")
+  .mode(SaveMode.Overwrite)
+  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+  .save()
 ```
 
-#### Saving DataFrames
+Java example:  
 
-The `save` is method on DataFrame allows passing in a data source type. You can use
-`org.apache.phoenix.spark`, and must also pass in a `table` and `zkUrl` parameter to
-specify which table and server to persist the DataFrame to. The column names are derived from
-the DataFrame's schema field names, and must match the Phoenix column names.
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromInputTable {
+    
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        
+        // Load INPUT_TABLE
+        Dataset<Row> df = sqlContext
+            .read()
+            .format("phoenix")
+            .option("table", "INPUT_TABLE")
+            .option(ZOOKEEPER_URL, "phoenix-server:2181")
+            .load();
+        
+        // Save to OUTPUT_TABLE
+        df.write()
+          .format("phoenix")
+          .mode(SaveMode.Overwrite)
+          .option("table", "OUTPUT_TABLE")
+          .option(ZOOKEEPER_URL, "phoenix-server:2181")
+          .save();
+        jsc.stop();
+    }
+}
+```
+
+#### Save from an external RDD with a schema to a Phoenix table
 
-The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported.
+Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and
+`zkUrl` parameters indicating which table and server to persist the DataFrame to.
 
-Given two Phoenix tables with the following DDL:
+Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table
+that you save to.
+
+Given an output Phoenix table with the following DDL:
 
 ```sql
-CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 ```
+you can save a dataframe from an RDD as follows in Scala: 
 
 ```scala
 import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-import org.apache.phoenix.spark._
-
-// Load INPUT_TABLE
-val sc = new SparkContext("local", "phoenix-test")
-val sqlContext = new SQLContext(sc)
-val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE",
-  "zkUrl" -> hbaseConnectionString))
-
-// Save to OUTPUT_TABLE
-df.saveToPhoenix(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> hbaseConnectionString))
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode}
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+
+val spark = SparkSession
+  .builder()
+  .appName("phoenix-test")
+  .master("local")
+  .getOrCreate()
+  
+val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
+
+val schema = StructType(
+  Seq(StructField("ID", LongType, nullable = false),
+    StructField("COL1", StringType),
+    StructField("COL2", IntegerType)))
+
+val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+// Apply the schema to the RDD.
+val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+df.write
+  .format("phoenix")
+  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+  .mode(SaveMode.Overwrite)
+  .save()
+```
 
-or
+Java example:  
 
-df.write \
- .format("org.apache.phoenix.spark") \
- .mode("overwrite") \
- .option("table", "OUTPUT_TABLE") \
- .option("zkUrl", "localhost:2181") \
- .save()
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromRDDWithSchema {
+ 
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        SparkSession spark = sqlContext.sparkSession();
+        Dataset<Row> df;
+  
+        // Generate the schema based on the fields
+        List<StructField> fields = new ArrayList<>();
+        fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false));
+        fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true));
+        fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true));
+        StructType schema = DataTypes.createStructType(fields);
+  
+        // Generate the rows with the same exact schema
+        List<Row> rows = new ArrayList<>();
+        for (int i = 1; i < 4; i++) {
+            rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
+        }
+  
+        // Create a DataFrame from the rows and the specified schema
+        df = spark.createDataFrame(rows, schema);
+        df.write()
+            .format("phoenix")
+            .mode(SaveMode.Overwrite)
+            .option("table", "OUTPUT_TABLE")
+            .option(ZOOKEEPER_URL,  "phoenix-server:2181")
+            .save();
+  
+        jsc.stop();
+    }
+}
 ```
 
 ### PySpark
@@ -187,14 +318,14 @@ With Spark's DataFrame support, you can
 
 #### Load a DataFrame
 
-Given a table _TABLE1_ and a Zookeeper url of `localhost:2181` you can load the table as a
+Given a table _TABLE1_ and a Zookeeper url of `phoenix-server:2181` you can load the table as a
 DataFrame using the following Python code in `pyspark`
 
 ```python
 df = sqlContext.read \
-  .format("org.apache.phoenix.spark") \
+  .format("phoenix") \
   .option("table", "TABLE1") \
-  .option("zkUrl", "localhost:2181") \
+  .option("zkUrl", "phoenix-server:2181") \
   .load()
 ```
 
@@ -205,22 +336,38 @@ using the following code
 
 ```python
 df.write \
-  .format("org.apache.phoenix.spark") \
+  .format("phoenix") \
   .mode("overwrite") \
   .option("table", "TABLE1") \
-  .option("zkUrl", "localhost:2181") \
+  .option("zkUrl", "phoenix-server:2181") \
   .save()
 ```
 
 
 ### Notes
 
-The functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
+- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"` 
+  instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
+- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
 optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
 as well as an optional `zkUrl` parameter for the Phoenix connection URL.
-
-If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set
+- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set
 in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified.
+- As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver
+to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:
+
+```scala
+df = spark
+  .sqlContext
+  .read
+  .format("phoenix")
+  .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", 
+    "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
+  .load;
+```  
+This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`.
+Note that the same property values will be used for both the driver and all executors and
+these configurations are used each time a connection is made (both on the driver and executors).
 
 ### Limitations
 
@@ -281,3 +428,74 @@ saves the results back to Phoenix.
     | 588                                      | 106.11840798585399                       |
     +------------------------------------------+------------------------------------------+
     ```
+***
+
+### Deprecated Usages
+
+#### Load as a DataFrame directly using a Configuration object
+```scala
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+val configuration = new Configuration()
+// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
+
+val sc = new SparkContext("local", "phoenix-test")
+val sqlContext = new SQLContext(sc)
+
+// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
+val df = sqlContext.phoenixTableAsDataFrame(
+  "TABLE1", Array("ID", "COL1"), conf = configuration
+)
+
+df.show
+```
+
+#### Load as an RDD, using a Zookeeper URL
+```scala
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+import org.apache.spark.rdd.RDD
+
+val sc = new SparkContext("local", "phoenix-test")
+
+// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
+val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
+  "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
+)
+
+rdd.count()
+
+val firstId = rdd.first()("ID").asInstanceOf[Long]
+val firstCol = rdd.first()("COL1").asInstanceOf[String]
+```
+
+#### Saving RDDs to Phoenix
+
+`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
+correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
+
+Given a Phoenix table with the following DDL:
+
+```sql
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+```
+
+```scala
+import org.apache.spark.SparkContext
+import org.apache.phoenix.spark._
+
+val sc = new SparkContext("local", "phoenix-test")
+val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+
+sc
+  .parallelize(dataSet)
+  .saveToPhoenix(
+    "OUTPUT_TEST_TABLE",
+    Seq("ID","COL1","COL2"),
+    zkUrl = Some("phoenix-server:2181")
+  )
+```