You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/27 03:15:21 UTC
svn commit: r1870483 - in /phoenix/site: publish/phoenix_spark.html
source/src/site/markdown/phoenix_spark.md
Author: chinmayskulkarni
Date: Wed Nov 27 03:15:21 2019
New Revision: 1870483
URL: http://svn.apache.org/viewvc?rev=1870483&view=rev
Log:
PHOENIX-5585: Add documentation for Phoenix-Spark Java example
Modified:
phoenix/site/publish/phoenix_spark.html
phoenix/site/source/src/site/markdown/phoenix_spark.md
Modified: phoenix/site/publish/phoenix_spark.html
URL: http://svn.apache.org/viewvc/phoenix/site/publish/phoenix_spark.html?rev=1870483&r1=1870482&r2=1870483&view=diff
==============================================================================
--- phoenix/site/publish/phoenix_spark.html (original)
+++ phoenix/site/publish/phoenix_spark.html Wed Nov 27 03:15:21 2019
@@ -226,6 +226,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2,
</div>
<div class="section">
<h4 id="Load_as_a_DataFrame_using_the_DataSourceV2_API">Load as a DataFrame using the DataSourceV2 API</h4>
+ <p>Scala example: </p>
<div class="source">
<pre>import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
@@ -249,6 +250,40 @@ df.filter(df("COL1") === "
.show
</pre>
</div>
+ <p>Java example: </p>
+ <div class="source">
+ <pre>import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkRead {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+
+ // Load data from TABLE1
+ Dataset<Row> df = sqlContext
+ .read()
+ .format("phoenix")
+ .option("table", "TABLE1")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .load();
+ df.createOrReplaceTempView("TABLE1");
+
+ SQLContext sqlCtx = new SQLContext(jsc);
+ df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
+ df.show();
+ jsc.stop();
+ }
+}
+</pre>
+ </div>
</div>
</div>
<div class="section">
@@ -263,7 +298,7 @@ df.filter(df("COL1") === "
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
</pre>
</div>
- <p>you can load from an input table and save to an output table as a DataFrame as follows:</p>
+ <p>you can load from an input table and save to an output table as a DataFrame as follows in Scala:</p>
<div class="source">
<pre>import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
@@ -279,18 +314,55 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
- .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
+ .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.load
// Save to OUTPUT_TABLE
-df
- .write
+df.write
.format("phoenix")
.mode(SaveMode.Overwrite)
- .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
+ .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.save()
</pre>
</div>
+ <p>Java example: </p>
+ <div class="source">
+ <pre>import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromInputTable {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+
+ // Load INPUT_TABLE
+ Dataset<Row> df = sqlContext
+ .read()
+ .format("phoenix")
+ .option("table", "INPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .load();
+
+ // Save to OUTPUT_TABLE
+ df.write()
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
+ .option("table", "OUTPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .save();
+ jsc.stop();
+ }
+}
+</pre>
+ </div>
</div>
<div class="section">
<h4 id="Save_from_an_external_RDD_with_a_schema_to_a_Phoenix_table">Save from an external RDD with a schema to a Phoenix table</h4>
@@ -301,7 +373,7 @@ df
<pre>CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
</pre>
</div>
- <p>you can save a dataframe from an RDD as follows: </p>
+ <p>you can save a dataframe from an RDD as follows in Scala: </p>
<div class="source">
<pre>import org.apache.spark.SparkContext
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField}
@@ -328,11 +400,66 @@ val df = spark.sqlContext.createDataFram
df.write
.format("phoenix")
- .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "quorumAddress"))
+ .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.mode(SaveMode.Overwrite)
.save()
</pre>
</div>
+ <p>Java example: </p>
+ <div class="source">
+ <pre>import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromRDDWithSchema {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+ SparkSession spark = sqlContext.sparkSession();
+ Dataset<Row> df;
+
+ // Generate the schema based on the fields
+ List<StructField> fields = new ArrayList<>();
+ fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false));
+ fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true));
+ fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true));
+ StructType schema = DataTypes.createStructType(fields);
+
+ // Generate the rows with the same exact schema
+ List<Row> rows = new ArrayList<>();
+ for (int i = 1; i < 4; i++) {
+ rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
+ }
+
+ // Create a DataFrame from the rows and the specified schema
+ df = spark.createDataFrame(rows, schema);
+ df.write()
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
+ .option("table", "OUTPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .save();
+
+ jsc.stop();
+ }
+}
+</pre>
+ </div>
</div>
</div>
<div class="section">
@@ -340,12 +467,12 @@ df.write
<p>With Sparkâs DataFrame support, you can also use <tt>pyspark</tt> to read and write from Phoenix tables.</p>
<div class="section">
<h4 id="Load_a_DataFrame">Load a DataFrame</h4>
- <p>Given a table <i>TABLE1</i> and a Zookeeper url of <tt>localhost:2181</tt> you can load the table as a DataFrame using the following Python code in <tt>pyspark</tt></p>
+ <p>Given a table <i>TABLE1</i> and a Zookeeper url of <tt>phoenix-server:2181</tt> you can load the table as a DataFrame using the following Python code in <tt>pyspark</tt></p>
<div class="source">
<pre>df = sqlContext.read \
.format("phoenix") \
.option("table", "TABLE1") \
- .option("zkUrl", "localhost:2181") \
+ .option("zkUrl", "phoenix-server:2181") \
.load()
</pre>
</div>
@@ -358,7 +485,7 @@ df.write
.format("phoenix") \
.mode("overwrite") \
.option("table", "TABLE1") \
- .option("zkUrl", "localhost:2181") \
+ .option("zkUrl", "phoenix-server:2181") \
.save()
</pre>
</div>
@@ -377,7 +504,7 @@ df.write
.sqlContext
.read
.format("phoenix")
- .options(Map("table" -> "Table1", "zkUrl" -> "hosta,hostb,hostc",
+ .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181",
"phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
.load;
</pre>
Modified: phoenix/site/source/src/site/markdown/phoenix_spark.md
URL: http://svn.apache.org/viewvc/phoenix/site/source/src/site/markdown/phoenix_spark.md?rev=1870483&r1=1870482&r2=1870483&view=diff
==============================================================================
--- phoenix/site/source/src/site/markdown/phoenix_spark.md (original)
+++ phoenix/site/source/src/site/markdown/phoenix_spark.md Wed Nov 27 03:15:21 2019
@@ -1,7 +1,7 @@
# Apache Spark Plugin
The phoenix-spark plugin extends Phoenix's MapReduce support to allow Spark to load Phoenix tables
-as RDDs or DataFrames, and enables persisting them back to Phoenix.
+as DataFrames, and enables persisting them back to Phoenix.
#### Prerequisites
@@ -28,8 +28,10 @@ The choice of which method to use to acc
for the Spark executors and drivers, set both '_spark.executor.extraClassPath_' and
'_spark.driver.extraClassPath_' in spark-defaults.conf to include the 'phoenix-_`<version>`_-client.jar'
-* Note that for Phoenix versions 4.7 and 4.8 you must use the 'phoenix-_`<version>`_-client-spark.jar'. As of Phoenix 4.10, the 'phoenix-_`<version>`_-client.jar' is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the `spark16` maven profile.
-
+* Note that for Phoenix versions 4.7 and 4.8 you must use the 'phoenix-_`<version>`_-client-spark.jar'.
+
+* As of Phoenix 4.10, the 'phoenix-_`<version>`_-client.jar' is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the `spark16` maven profile.
+
* To help your IDE, you can add the following _provided_ dependency to your build:
```
@@ -41,9 +43,21 @@ for the Spark executors and drivers, set
</dependency>
```
+* As of Phoenix 4.15.0, the connectors project will be separated from the main phoenix project (see [phoenix-connectors](https://github.com/apache/phoenix-connectors))
+and will have its own releases. You can add the following dependency in your project:
+
+```
+<dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-spark</artifactId>
+ <version>${phoenix.connectors.version}</version>
+</dependency>
+```
+The first released connectors jar is `connectors-1.0.0` (replace above `phoenix.connectors.version` with this version)
+
### Reading Phoenix Tables
-Given a Phoenix table with the following DDL
+Given a Phoenix table with the following DDL and DML:
```sql
CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
@@ -51,134 +65,251 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (1,
UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
```
-#### Load as a DataFrame using the Data Source API
+#### Load as a DataFrame using the DataSourceV2 API
+Scala example:
+
```scala
import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.phoenix.spark._
-
-val sc = new SparkContext("local", "phoenix-test")
-val sqlContext = new SQLContext(sc)
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
-val df = sqlContext.load(
- "org.apache.phoenix.spark",
- Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
-)
+val spark = SparkSession
+ .builder()
+ .appName("phoenix-test")
+ .master("local")
+ .getOrCreate()
+
+// Load data from TABLE1
+val df = spark.sqlContext
+ .read
+ .format("phoenix")
+ .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+ .load
-df
- .filter(df("COL1") === "test_row_1" && df("ID") === 1L)
+df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
.select(df("ID"))
.show
```
-#### Load as a DataFrame directly using a Configuration object
-```scala
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.phoenix.spark._
-
-val configuration = new Configuration()
-// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
-
-val sc = new SparkContext("local", "phoenix-test")
-val sqlContext = new SQLContext(sc)
-
-// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
-val df = sqlContext.phoenixTableAsDataFrame(
- "TABLE1", Array("ID", "COL1"), conf = configuration
-)
+Java example:
-df.show
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkRead {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+
+ // Load data from TABLE1
+ Dataset<Row> df = sqlContext
+ .read()
+ .format("phoenix")
+ .option("table", "TABLE1")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .load();
+ df.createOrReplaceTempView("TABLE1");
+
+ SQLContext sqlCtx = new SQLContext(jsc);
+ df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
+ df.show();
+ jsc.stop();
+ }
+}
```
-#### Load as an RDD, using a Zookeeper URL
-```scala
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.phoenix.spark._
-
-val sc = new SparkContext("local", "phoenix-test")
-
-// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
-val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
- "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
-)
+### Saving to Phoenix
-rdd.count()
+#### Save DataFrames to Phoenix using DataSourceV2
-val firstId = rdd1.first()("ID").asInstanceOf[Long]
-val firstCol = rdd1.first()("COL1").asInstanceOf[String]
-```
+The `save` is method on DataFrame allows passing in a data source type. You can use
+`phoenix` for DataSourceV2 and must also pass in a `table` and `zkUrl` parameter to
+specify which table and server to persist the DataFrame to. The column names are derived from
+the DataFrame's schema field names, and must match the Phoenix column names.
-### Saving Phoenix
+The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported.
-Given a Phoenix table with the following DDL
+Given two Phoenix tables with the following DDL:
```sql
-CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
-
-#### Saving RDDs
-
-The `saveToPhoenix` method is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
-correspond to one of [the Java types supported by Phoenix](language/datatypes.html).
-
+you can load from an input table and save to an output table as a DataFrame as follows in Scala:
```scala
import org.apache.spark.SparkContext
-import org.apache.phoenix.spark._
+import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
-val sc = new SparkContext("local", "phoenix-test")
-val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+val spark = SparkSession
+ .builder()
+ .appName("phoenix-test")
+ .master("local")
+ .getOrCreate()
+
+// Load INPUT_TABLE
+val df = spark.sqlContext
+ .read
+ .format("phoenix")
+ .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+ .load
-sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "OUTPUT_TEST_TABLE",
- Seq("ID","COL1","COL2"),
- zkUrl = Some("phoenix-server:2181")
- )
+// Save to OUTPUT_TABLE
+df.write
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
+ .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+ .save()
```
-#### Saving DataFrames
+Java example:
-The `save` is method on DataFrame allows passing in a data source type. You can use
-`org.apache.phoenix.spark`, and must also pass in a `table` and `zkUrl` parameter to
-specify which table and server to persist the DataFrame to. The column names are derived from
-the DataFrame's schema field names, and must match the Phoenix column names.
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromInputTable {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+
+ // Load INPUT_TABLE
+ Dataset<Row> df = sqlContext
+ .read()
+ .format("phoenix")
+ .option("table", "INPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .load();
+
+ // Save to OUTPUT_TABLE
+ df.write()
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
+ .option("table", "OUTPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .save();
+ jsc.stop();
+ }
+}
+```
+
+#### Save from an external RDD with a schema to a Phoenix table
-The `save` method also takes a `SaveMode` option, for which only `SaveMode.Overwrite` is supported.
+Just like the previous example, you can pass in the data source type as `phoenix` and specify the `table` and
+`zkUrl` parameters indicating which table and server to persist the DataFrame to.
-Given two Phoenix tables with the following DDL:
+Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table
+that you save to.
+
+Given an output Phoenix table with the following DDL:
```sql
-CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
+you can save a dataframe from an RDD as follows in Scala:
```scala
import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-import org.apache.phoenix.spark._
-
-// Load INPUT_TABLE
-val sc = new SparkContext("local", "phoenix-test")
-val sqlContext = new SQLContext(sc)
-val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE",
- "zkUrl" -> hbaseConnectionString))
-
-// Save to OUTPUT_TABLE
-df.saveToPhoenix(Map("table" -> "OUTPUT_TABLE", "zkUrl" -> hbaseConnectionString))
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField}
+import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode}
+import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+
+val spark = SparkSession
+ .builder()
+ .appName("phoenix-test")
+ .master("local")
+ .getOrCreate()
+
+val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
+
+val schema = StructType(
+ Seq(StructField("ID", LongType, nullable = false),
+ StructField("COL1", StringType),
+ StructField("COL2", IntegerType)))
+
+val rowRDD = spark.sparkContext.parallelize(dataSet)
+
+// Apply the schema to the RDD.
+val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+
+df.write
+ .format("phoenix")
+ .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
+ .mode(SaveMode.Overwrite)
+ .save()
+```
-or
+Java example:
-df.write \
- .format("org.apache.phoenix.spark") \
- .mode("overwrite") \
- .option("table", "OUTPUT_TABLE") \
- .option("zkUrl", "localhost:2181") \
- .save()
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromRDDWithSchema {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+ SparkSession spark = sqlContext.sparkSession();
+ Dataset<Row> df;
+
+ // Generate the schema based on the fields
+ List<StructField> fields = new ArrayList<>();
+ fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false));
+ fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true));
+ fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true));
+ StructType schema = DataTypes.createStructType(fields);
+
+ // Generate the rows with the same exact schema
+ List<Row> rows = new ArrayList<>();
+ for (int i = 1; i < 4; i++) {
+ rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
+ }
+
+ // Create a DataFrame from the rows and the specified schema
+ df = spark.createDataFrame(rows, schema);
+ df.write()
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
+ .option("table", "OUTPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .save();
+
+ jsc.stop();
+ }
+}
```
### PySpark
@@ -187,14 +318,14 @@ With Spark's DataFrame support, you can
#### Load a DataFrame
-Given a table _TABLE1_ and a Zookeeper url of `localhost:2181` you can load the table as a
+Given a table _TABLE1_ and a Zookeeper url of `phoenix-server:2181` you can load the table as a
DataFrame using the following Python code in `pyspark`
```python
df = sqlContext.read \
- .format("org.apache.phoenix.spark") \
+ .format("phoenix") \
.option("table", "TABLE1") \
- .option("zkUrl", "localhost:2181") \
+ .option("zkUrl", "phoenix-server:2181") \
.load()
```
@@ -205,22 +336,38 @@ using the following code
```python
df.write \
- .format("org.apache.phoenix.spark") \
+ .format("phoenix") \
.mode("overwrite") \
.option("table", "TABLE1") \
- .option("zkUrl", "localhost:2181") \
+ .option("zkUrl", "phoenix-server:2181") \
.save()
```
### Notes
-The functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
+- If you want to use DataSourceV1, you can use source type `"org.apache.phoenix.spark"`
+ instead of `"phoenix"`, however this is deprecated as of `connectors-1.0.0`.
+- The (deprecated) functions `phoenixTableAsDataFrame`, `phoenixTableAsRDD` and `saveToPhoenix` all support
optionally specifying a `conf` Hadoop configuration parameter with custom Phoenix client settings,
as well as an optional `zkUrl` parameter for the Phoenix connection URL.
-
-If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set
+- If `zkUrl` isn't specified, it's assumed that the "hbase.zookeeper.quorum" property has been set
in the `conf` parameter. Similarly, if no configuration is passed in, `zkUrl` must be specified.
+- As of [PHOENIX-5197](https://issues.apache.org/jira/browse/PHOENIX-5197), you can pass configurations from the driver
+to executors as a comma-separated list against the key `phoenixConfigs` i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:
+
+```scala
+df = spark
+ .sqlContext
+ .read
+ .format("phoenix")
+ .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181",
+ "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
+ .load;
+```
+This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`.
+Note that the same property values will be used for both the driver and all executors and
+these configurations are used each time a connection is made (both on the driver and executors).
### Limitations
@@ -281,3 +428,74 @@ saves the results back to Phoenix.
| 588 | 106.11840798585399 |
+------------------------------------------+------------------------------------------+
```
+***
+
+### Deprecated Usages
+
+#### Load as a DataFrame directly using a Configuration object
+```scala
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+
+val configuration = new Configuration()
+// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
+
+val sc = new SparkContext("local", "phoenix-test")
+val sqlContext = new SQLContext(sc)
+
+// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
+val df = sqlContext.phoenixTableAsDataFrame(
+ "TABLE1", Array("ID", "COL1"), conf = configuration
+)
+
+df.show
+```
+
+#### Load as an RDD, using a Zookeeper URL
+```scala
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.phoenix.spark._
+import org.apache.spark.rdd.RDD
+
+val sc = new SparkContext("local", "phoenix-test")
+
+// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
+val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
+ "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
+)
+
+rdd.count()
+
+val firstId = rdd.first()("ID").asInstanceOf[Long]
+val firstCol = rdd.first()("COL1").asInstanceOf[String]
+```
+
+#### Saving RDDs to Phoenix
+
+`saveToPhoenix` is an implicit method on RDD[Product], or an RDD of Tuples. The data types must
+correspond to the Java types Phoenix supports (http://phoenix.apache.org/language/datatypes.html)
+
+Given a Phoenix table with the following DDL:
+
+```sql
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
+```
+
+```scala
+import org.apache.spark.SparkContext
+import org.apache.phoenix.spark._
+
+val sc = new SparkContext("local", "phoenix-test")
+val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+
+sc
+ .parallelize(dataSet)
+ .saveToPhoenix(
+ "OUTPUT_TEST_TABLE",
+ Seq("ID","COL1","COL2"),
+ zkUrl = Some("phoenix-server:2181")
+ )
+```