You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/27 02:58:15 UTC

[phoenix-connectors] branch master updated: PHOENIX-5585: Add documentation for Phoenix-Spark Java example

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e2ad3  PHOENIX-5585: Add documentation for Phoenix-Spark Java example
77e2ad3 is described below

commit 77e2ad39f46afb67c8675bd0bff9af7795096a60
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Tue Nov 26 18:22:31 2019 -0800

    PHOENIX-5585: Add documentation for Phoenix-Spark Java example
---
 phoenix-spark/README.md | 140 +++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 132 insertions(+), 8 deletions(-)

diff --git a/phoenix-spark/README.md b/phoenix-spark/README.md
index f906317..2540b73 100644
--- a/phoenix-spark/README.md
+++ b/phoenix-spark/README.md
@@ -29,6 +29,7 @@ UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');
 ```
 
 ### Load as a DataFrame using the DataSourceV2 API
+Scala example:
 ```scala
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{SQLContext, SparkSession}
@@ -51,6 +52,39 @@ df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
   .select(df("ID"))
   .show
 ```
+Java example:
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkRead {
+    
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        
+        // Load data from TABLE1
+        Dataset<Row> df = sqlContext
+            .read()
+            .format("phoenix")
+            .option("table", "TABLE1")
+            .option(ZOOKEEPER_URL, "phoenix-server:2181")
+            .load();
+        df.createOrReplaceTempView("TABLE1");
+    
+        SQLContext sqlCtx = new SQLContext(jsc);
+        df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
+        df.show();
+        jsc.stop();
+    }
+}
+```
 
 ## Saving to Phoenix
 
@@ -69,7 +103,7 @@ Given two Phoenix tables with the following DDL:
 CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 ```
-you can load from an input table and save to an output table as a DataFrame as follows:
+you can load from an input table and save to an output table as a DataFrame as follows in Scala:
 
 ```scala
 import org.apache.spark.SparkContext
@@ -86,17 +120,53 @@ val spark = SparkSession
 val df = spark.sqlContext
   .read
   .format("phoenix")
-  .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
+  .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
   .load
 
 // Save to OUTPUT_TABLE
-df
-  .write
+df.write
   .format("phoenix")
   .mode(SaveMode.Overwrite)
-  .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "hbaseConnectionString"))
+  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
   .save()
 ```
+Java example:
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SQLContext;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromInputTable {
+    
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        
+        // Load INPUT_TABLE
+        Dataset<Row> df = sqlContext
+            .read()
+            .format("phoenix")
+            .option("table", "INPUT_TABLE")
+            .option(ZOOKEEPER_URL, "phoenix-server:2181")
+            .load();
+        
+        // Save to OUTPUT_TABLE
+        df.write()
+          .format("phoenix")
+          .mode(SaveMode.Overwrite)
+          .option("table", "OUTPUT_TABLE")
+          .option(ZOOKEEPER_URL, "phoenix-server:2181")
+          .save();
+        jsc.stop();
+    }
+}
+```
 
 ### Save from an external RDD with a schema to a Phoenix table
 
@@ -111,7 +181,7 @@ Given an output Phoenix table with the following DDL:
 ```sql
 CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
 ```
-you can save a dataframe from an RDD as follows: 
+you can save a dataframe from an RDD as follows in Scala: 
 
 ```scala
 import org.apache.spark.SparkContext
@@ -139,10 +209,64 @@ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
 
 df.write
   .format("phoenix")
-  .options(Map("table" -> "OUTPUT_TEST_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "quorumAddress"))
+  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
   .mode(SaveMode.Overwrite)
   .save()
 ```
+Java example:
+```java
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+
+public class PhoenixSparkWriteFromRDDWithSchema {
+ 
+    public static void main() throws Exception {
+        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        SQLContext sqlContext = new SQLContext(jsc);
+        SparkSession spark = sqlContext.sparkSession();
+        Dataset<Row> df;
+  
+        // Generate the schema based on the fields
+        List<StructField> fields = new ArrayList<>();
+        fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false));
+        fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true));
+        fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true));
+        StructType schema = DataTypes.createStructType(fields);
+  
+        // Generate the rows with the same exact schema
+        List<Row> rows = new ArrayList<>();
+        for (int i = 1; i < 4; i++) {
+            rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
+        }
+  
+        // Create a DataFrame from the rows and the specified schema
+        df = spark.createDataFrame(rows, schema);
+        df.write()
+            .format("phoenix")
+            .mode(SaveMode.Overwrite)
+            .option("table", "OUTPUT_TABLE")
+            .option(ZOOKEEPER_URL,  "phoenix-server:2181")
+            .save();
+  
+        jsc.stop();
+    }
+}
+```
 
 ## Notes
 
@@ -160,7 +284,7 @@ to executors as a comma-separated list against the key `phoenixConfigs` i.e (Pho
       .sqlContext
       .read
       .format("phoenix")
-      .options(Map("table" -> "Table1", "zkUrl" -> "hosta,hostb,hostc", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
+      .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
       .load;
     ```
     This list of properties is parsed and populated into a properties map which is passed to `DriverManager.getConnection(connString, propsMap)`.