You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/27 02:58:15 UTC
[phoenix-connectors] branch master updated: PHOENIX-5585: Add
documentation for Phoenix-Spark Java example
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 77e2ad3 PHOENIX-5585: Add documentation for Phoenix-Spark Java example
77e2ad3 is described below
commit 77e2ad39f46afb67c8675bd0bff9af7795096a60
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Tue Nov 26 18:22:31 2019 -0800
PHOENIX-5585: Add documentation for Phoenix-Spark Java example
---
phoenix-spark/README.md | 140 +++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 132 insertions(+), 8 deletions(-)
diff --git a/phoenix-spark/README.md b/phoenix-spark/README.md
index f906317..2540b73 100644
--- a/phoenix-spark/README.md
+++ b/phoenix-spark/README.md
@@ -29,6 +29,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
```
### Load as a DataFrame using the DataSourceV2 API
+Scala example:
```scala
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
@@ -51,6 +52,39 @@ df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
.select(df("ID"))
.show
```
+Java example:
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkRead {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+
+ // Load data from TABLE1
+ Dataset<Row> df = sqlContext
+ .read()
+ .format("phoenix")
+ .option("table", "TABLE1")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .load();
+ df.createOrReplaceTempView("TABLE1");
+
+ SQLContext sqlCtx = new SQLContext(jsc);
+ df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
+ df.show();
+ jsc.stop();
+ }
+}
+```
## Saving to Phoenix
@@ -69,7 +103,7 @@ Given two Phoenix tables with the following DDL:
CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
-you can load from an input table and save to an output table as a DataFrame as follows:
+you can load from an input table and save to an output table as a DataFrame as follows in Scala:
```scala
import org.apache.spark.SparkContext
@@ -86,17 +120,53 @@ val spark = SparkSession
val df = spark.sqlContext
.read
.format("phoenix")
- .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
+ .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.load
// Save to OUTPUT_TABLE
-df
- .write
+df.write
.format("phoenix")
.mode(SaveMode.Overwrite)
- .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
+ .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.save()
```
+Java example:
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromInputTable {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+
+ // Load INPUT_TABLE
+ Dataset<Row> df = sqlContext
+ .read()
+ .format("phoenix")
+ .option("table", "INPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .load();
+
+ // Save to OUTPUT_TABLE
+ df.write()
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
+ .option("table", "OUTPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .save();
+ jsc.stop();
+ }
+}
+```
### Save from an external RDD with a schema to a Phoenix table
@@ -111,7 +181,7 @@ Given an output Phoenix table with the following DDL:
```sql
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
```
-you can save a dataframe from an RDD as follows:
+you can save a dataframe from an RDD as follows in Scala:
```scala
import org.apache.spark.SparkContext
@@ -139,10 +209,64 @@ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
df.write
.format("phoenix")
- .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "quorumAddress"))
+ .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.mode(SaveMode.Overwrite)
.save()
```
+Java example:
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromRDDWithSchema {
+
+ public static void main() throws Exception {
+ SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ SQLContext sqlContext = new SQLContext(jsc);
+ SparkSession spark = sqlContext.sparkSession();
+ Dataset<Row> df;
+
+ // Generate the schema based on the fields
+ List<StructField> fields = new ArrayList<>();
+ fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false));
+ fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true));
+ fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true));
+ StructType schema = DataTypes.createStructType(fields);
+
+ // Generate the rows with the same exact schema
+ List<Row> rows = new ArrayList<>();
+ for (int i = 1; i < 4; i++) {
+ rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
+ }
+
+ // Create a DataFrame from the rows and the specified schema
+ df = spark.createDataFrame(rows, schema);
+ df.write()
+ .format("phoenix")
+ .mode(SaveMode.Overwrite)
+ .option("table", "OUTPUT_TABLE")
+ .option(ZOOKEEPER_URL, "phoenix-server:2181")
+ .save();
+
+ jsc.stop();
+ }
+}
+```
## Notes
@@ -160,7 +284,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
.sqlContext
.read
.format("phoenix")
- .options(Map("table" -> "Table1", "zkUrl" -> "hosta,hostb,hostc", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
+ .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
.load;
```
This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`.