You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:18 UTC

[32/50] [abbrv] incubator-livy-website git commit: [BAHIR-101] Update sql-cloudant readme and python examples

[BAHIR-101] Update sql-cloudant readme and python examples

Closes #40.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/commit/561291bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/tree/561291bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/diff/561291bf

Branch: refs/heads/master
Commit: 561291bfc17f8eae97318b39ea9cc2d80680d5ce
Parents: 889de65
Author: Esteban Laver <em...@us.ibm.com>
Authored: Mon Apr 3 18:05:44 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Apr 6 08:28:10 2017 -0700

----------------------------------------------------------------------
 sql-cloudant/README.md                          | 306 ++++++++-----------
 sql-cloudant/examples/python/CloudantApp.py     |   9 +-
 sql-cloudant/examples/python/CloudantDF.py      |   5 +-
 .../examples/python/CloudantDFOption.py         |   5 +-
 4 files changed, 143 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/561291bf/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index 98a1c85..eaa8893 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -1,24 +1,12 @@
-Spark Cloudant Connector
-================
+A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming. 
 
-Cloudant integration with Spark as Spark SQL external datasource, and Spark Streaming as a custom receiver. 
+[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a Service (DBaaS). It stores data as documents 
+in JSON format. It's built with scalability, high availability, and durability in mind. It comes with a 
+wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and 
+geospatial indexing. The replication capabilities make it easy to keep data in sync between database 
+clusters, desktop PCs, and mobile devices.
 
-
-##  Contents:
-0. [Linking](#Linking)
-1. [Implementation of RelationProvider](#implementation-of-relationProvider)
-2. [Implementation of Receiver](#implementation-of-Receiver)
-3. [Sample applications](#Sample-application)
-    1. [Using SQL In Python](#Using-SQL-In-Python)
-    2. [Using SQL In Scala](#Using-SQL-In-Scala)
-    3. [Using DataFrame In Python](#Using-DataFrame-In-Python)
-    4. [Using DataFrame In Scala](#Using-DataFrame-In-Scala)
-    5. [Using Streams In Scala](#Using-Streams-In-Scala)
-4. [Configuration Overview](#Configuration-Overview)
-5. [Known limitations and areas for improvement](#Known-limitations)
-
-
-<div id='Linking'/>
+[Apache CouchDB™](http://couchdb.apache.org) is open source database software that focuses on ease of use and having an architecture that "completely embraces the Web". It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API.
 
 ## Linking
 
@@ -35,51 +23,96 @@ Using Maven:
     </dependency>
 
 This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
-For example, to include it when starting the spark shell:
 
     $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT
 
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
 
+Submit a job in Python:
+    
+    spark-submit  --master local[4] --jars <path to cloudant-spark.jar>  <path to python script> 
+    
+Submit a job in Scala:
+
+	spark-submit --class "<your class>" --master local[4] --jars <path to cloudant-spark.jar> <path to your app jar>
+
 This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
 
 
-<div id='implementation-of-relationProvider'/>
+## Configuration options	
+The configuration is obtained in the following sequence:
 
-### Implementation of RelationProvider
+1. default in the Config, which is set in the application.conf
+2. key in the SparkConf, which is set in SparkConf
+3. key in the parameters, which is set in a dataframe or temporaty table options
+4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option)
 
-[DefaultSource.scala](src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala) is a RelationProvider for loading data from Cloudant to Spark, and saving it back from Cloudant to Spark.  It has the following functionalities:
+Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code.
 
-Functionality | Enablement 
---- | ---
-Table Option | database or path, search index, view 
-Scan Type | PrunedFilteredScan 
-Column Pruning | yes
-Predicates Push Down | _id or first predicate 
-Parallel Loading | yes, except with search index
-Insert-able | yes
- 
 
-<div id='implementation-of-Receiver'/>
+### Configuration in application.conf
+Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf).
+
+### Configuration on SparkConf
 
-### Implementation of Receiver
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.protocol|https|protocol to use to transfer data: http or https
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
+jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
+jsonstore.rdd.minInPartition|10|the min rows in a partition.
+jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds
+bulkSize|200| the bulk save size
+schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs 
+createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. 
 
-Spark Cloudant connector creates a discretized stream in Spark (Spark input DStream) out of Cloudant data sources. [CloudantReceiver.scala](src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala) is a custom Receiver that converts `_changes` feed from a Cloudant database to DStream in Spark. This allows all sorts of processing on this streamed data including [using DataFrames and SQL operations on it](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala).
+### Configuration on Spark SQL Temporary Table or DataFrame
 
-**NOTE:** Since CloudantReceiver for Spark Streaming is based on `_changes` API, there are some limitations that application developers should be aware of. Firstly, results returned from `_changes` are partially ordered, and may not be presented in order in which documents were updated. Secondly, in case of shards' unavailability, you may see duplicates, changes that have been seen already. Thus, it is up to applications using Spark Streaming with CloudantReceiver to keep track of _changes they have processed and detect duplicates. 
+Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS: 
 
+Name | Default | Meaning
+--- |:---:| ---
+database||cloudant database name
+view||cloudant view w/o the database name. only used for load.
+index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.
+path||cloudant: as database name if database is not present
+schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents
+bulkSize|200| the bulk save size
+createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. 
 
-<div id='Sample-application'/>
+For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view: 
 
-## Sample applications
+```python
+spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
 
-<div id='Using-SQL-In-Python'/>
+```
 
-### Using SQL In Python 
-	
-[CloudantApp.py](examples/python/CloudantApp.py)
+### Configuration on Cloudant Receiver for Spark Streaming
 
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+database||cloudant database name
+selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark.
+
+
+### Configuration in spark-submit using --conf option
+
+The above stated configuration keys can also be set using `spark-submit --conf` option. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys.
+
+
+## Examples
+
+### Python API
+
+#### Using SQL In Python 
+	
 ```python
 spark = SparkSession\
     .builder\
@@ -90,21 +123,58 @@ spark = SparkSession\
     .getOrCreate()
 
 
-#### Loading temp table from Cloudant db
+# Loading temp table from Cloudant db
 spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
 airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
 airportData.printSchema()
 print 'Total # of rows in airportData: ' + str(airportData.count())
 for code in airportData.collect():
     print code._id
-```	
+```
 
-<div id='Using-SQL-In-Scala'/>
+See [CloudantApp.py](examples/python/CloudantApp.py) for examples.
 
-### Using SQL In Scala 
+Submit job example:
+```
+spark-submit  --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py
+```
 
+#### Using DataFrame In Python 
 
-[CloudantApp.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala)
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using dataframes")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .getOrCreate()
+
+# ***1. Loading dataframe from Cloudant db
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.cache() 
+df.printSchema()
+df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
+df.filter(df._id >= 'CAA').select("_id",'airportName').show()	    
+```
+
+See [CloudantDF.py](examples/python/CloudantDF.py) for examples.
+	
+In case of doing multiple operations on a dataframe (select, filter etc.),
+you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
+Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`.  Alternatively for large dbs to persist in memory & disk, use: 
+
+```python
+from pyspark import StorageLevel
+df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
+```
+
+[Sample code](examples/python/CloudantDFOption.py) on using DataFrame option to define cloudant configuration
+
+### Scala API
+
+#### Using SQL In Scala 
 
 ```scala
 val spark = SparkSession
@@ -122,7 +192,7 @@ import spark.implicits._
 spark.sql(
     s"""
     |CREATE TEMPORARY TABLE airportTable
-    |USING org.apache.bahir.cloudant.spark
+    |USING org.apache.bahir.cloudant
     |OPTIONS ( database 'n_airportcodemapping')
     """.stripMargin)
 // create a dataframe
@@ -131,52 +201,17 @@ airportData.printSchema()
 println(s"Total # of rows in airportData: " + airportData.count())
 // convert dataframe to array of Rows, and process each row
 airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println)
-	
-```	
-
-
-<div id='Using-DataFrame-In-Python'/>	
-
-### Using DataFrame In Python 
-
-[CloudantDF.py](examples/python/CloudantDF.py). 
-
-```python	    
-spark = SparkSession\
-    .builder\
-    .appName("Cloudant Spark SQL Example in Python using dataframes")\
-    .config("cloudant.host","ACCOUNT.cloudant.com")\
-    .config("cloudant.username", "USERNAME")\
-    .config("cloudant.password","PASSWORD")\
-    .config("jsonstore.rdd.partitions", 8)\
-    .getOrCreate()
-
-#### Loading dataframe from Cloudant db
-df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
-df.cache() 
-df.printSchema()
-df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
-df.filter(df._id >= 'CAA').select("_id",'airportName').show()	    
 ```
-	
-In case of doing multiple operations on a dataframe (select, filter etc.),
-you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
-Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`.  Alternatively for large dbs to persist in memory & disk, use: 
-
-```python
-from pyspark import StorageLevel
-df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
-```	
+See [CloudantApp.scala](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala) for examples.
 
-[Sample code on using DataFrame option to define cloudant configuration](examples/python/CloudantDFOption.py)
-
-<div id='Using-DataFrame-In-Scala'/>	
+Submit job example:
+```
+spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD  /path/to/spark-sql-cloudant_2.11-2.2.0-SNAPSHOT-tests.jar
+```
 
 ### Using DataFrame In Scala 
 
-[CloudantDF.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala)
-
-```	scala
+```scala
 val spark = SparkSession
       .builder()
       .appName("Cloudant Spark SQL Example with Dataframe")
@@ -199,15 +234,14 @@ val df2 = df.filter(df("flightSegmentId") === "AA106")
     .select("flightSegmentId","economyClassBaseCost")
 df2.show()
 df2.write.format("org.apache.bahir.cloudant").save("n_flight2")
-```	
+```
+
+See [CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala) for examples.
     
- [Sample code on using DataFrame option to define cloudant configuration](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala)
- 
+[Sample code](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala) on using DataFrame option to define Cloudant configuration.
  
-<div id='Using-Streams-In-Scala'/>
  
 ### Using Streams In Scala 
-[CloudantStreaming.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala)
 
 ```scala
 val ssc = new StreamingContext(sparkConf, Seconds(10))
@@ -235,9 +269,14 @@ ssc.start()
 Thread.sleep(120000L)
 ssc.stop(true)
 	
-```	
+```
 
-By default, Spark Streaming will load all documents from a database. If you want to limit the loading to specific documents, use `selector` option of `CloudantReceiver` and specify your conditions ([CloudantStreamingSelector.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala)):
+See [CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala) for examples.
+
+By default, Spark Streaming will load all documents from a database. If you want to limit the loading to 
+specific documents, use `selector` option of `CloudantReceiver` and specify your conditions 
+(See [CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala)
+example for more details):
 
 ```scala
 val changes = ssc.receiverStream(new CloudantReceiver(Map(
@@ -247,78 +286,3 @@ val changes = ssc.receiverStream(new CloudantReceiver(Map(
   "database" -> "sales",
   "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}")))
 ```
-
-
-<div id='Configuration-Overview'/>
-
-## Configuration Overview	
-
-The configuration is obtained in the following sequence:
-
-1. default in the Config, which is set in the application.conf
-2. key in the SparkConf, which is set in SparkConf
-3. key in the parameters, which is set in a dataframe or temporaty table options, or StreamReceiver
-4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option)
-
-Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys.
-
-
-### Configuration in application.conf
-
-Default values are defined in [here](src/main/resources/application.conf)
-
-### Configuration on SparkConf
-
-Name | Default | Meaning
---- |:---:| ---
-cloudant.protocol|https|protocol to use to transfer data: http or https
-cloudant.host||cloudant host url
-cloudant.username||cloudant userid
-cloudant.password||cloudant password
-jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
-jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
-jsonstore.rdd.minInPartition|10|the min rows in a partition.
-jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds
-bulkSize|200| the bulk save size
-schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs 
-createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. 
-
-
-###  Configuration on Spark SQL Temporary Table or DataFrame
-
-Besides overriding any SparkConf configuration, you can also set the following configurations at temporary table or dataframe level.
-
-Name | Default | Meaning
---- |:---:| ---
-database||cloudant database name
-view||cloudant view w/o the database name. only used for load.  
-index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.
-path||cloudant: as database name if database is not present
-
-
-#### View Specific
-
-For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compound field. An example of loading data from a view:
-
-```python
-spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
-
-```
-
-###  Configuration on Cloudant Receiver for Spark Streaming
-
-Besides overriding any SparkConf configuration, you can also set the following configurations at stream Receiver level
-
-Name | Default | Meaning
---- |:---:| ---
-database||cloudant database name
-selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark.
-
-
-<div id='Known-limitations'/>
-
-## Known limitations and areas for improvement
-
-* Loading data from Cloudant search index will work only for up to 200 results.
-		
-* Need to improve how number of partitions is determined for parallel loading

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/561291bf/sql-cloudant/examples/python/CloudantApp.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantApp.py b/sql-cloudant/examples/python/CloudantApp.py
index 029f39b..c403aeb 100644
--- a/sql-cloudant/examples/python/CloudantApp.py
+++ b/sql-cloudant/examples/python/CloudantApp.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import pprint
 from pyspark.sql import SparkSession
 
 spark = SparkSession\
@@ -30,16 +29,16 @@ spark = SparkSession\
 spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
 airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
 airportData.printSchema()
-print 'Total # of rows in airportData: ' + str(airportData.count())
+print ('Total # of rows in airportData: ' + str(airportData.count()))
 for code in airportData.collect():
-    print code._id
+    print (code._id)
 
 
 # ***2. Loading temp table from Cloudant search index
-print 'About to test org.apache.bahir.cloudant for flight with index'
+print ('About to test org.apache.bahir.cloudant for flight with index')
 spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', index '_design/view/_search/n_flights')")
 flightData = spark.sql("SELECT flightSegmentId, scheduledDepartureTime FROM flightTable1 WHERE flightSegmentId >'AA9' AND flightSegmentId<'AA95'")
 flightData.printSchema()
 for code in flightData.collect():
-    print 'Flight {0} on {1}'.format(code.flightSegmentId, code.scheduledDepartureTime)
+    print ('Flight {0} on {1}'.format(code.flightSegmentId, code.scheduledDepartureTime))
 

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/561291bf/sql-cloudant/examples/python/CloudantDF.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantDF.py b/sql-cloudant/examples/python/CloudantDF.py
index c009e98..a8af0fa 100644
--- a/sql-cloudant/examples/python/CloudantDF.py
+++ b/sql-cloudant/examples/python/CloudantDF.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import pprint
 from pyspark.sql import SparkSession
 
 # define cloudant related configuration
@@ -54,7 +53,7 @@ df2.write.save("n_flight2",  "org.apache.bahir.cloudant",
         bulkSize = "100", createDBOnSave="true") 
 total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", 
         "scheduledDepartureTime").orderBy(df.flightSegmentId).count()
-print "Total", total, "flights from table"
+print ("Total", total, "flights from table")
 
 
 # ***3. Loading dataframe from a Cloudant search index
@@ -63,7 +62,7 @@ df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight",
 df.printSchema()
 total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", 
         "scheduledDepartureTime").orderBy(df.flightSegmentId).count()
-print "Total", total, "flights from index"
+print ("Total", total, "flights from index")
 
 
 # ***4. Loading dataframe from a Cloudant view

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/561291bf/sql-cloudant/examples/python/CloudantDFOption.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantDFOption.py b/sql-cloudant/examples/python/CloudantDFOption.py
index c045532..a7f5e38 100644
--- a/sql-cloudant/examples/python/CloudantDFOption.py
+++ b/sql-cloudant/examples/python/CloudantDFOption.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import pprint
 from pyspark.sql import SparkSession
 
 spark = SparkSession\
@@ -55,7 +54,7 @@ df.printSchema()
 total = df.filter(df.flightSegmentId >'AA9') \
     .select("flightSegmentId", "scheduledDepartureTime") \
     .orderBy(df.flightSegmentId).count()
-print "Total", total, "flights from table"
+print ("Total", total, "flights from table")
 
 
 # ***3. Loading dataframe from Cloudant search index
@@ -69,4 +68,4 @@ df.printSchema()
 total = df.filter(df.flightSegmentId >'AA9') \
     .select("flightSegmentId", "scheduledDepartureTime") \
     .orderBy(df.flightSegmentId).count()
-print "Total", total, "flights from index"
+print ("Total", total, "flights from index")