You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:16 UTC

[30/50] [abbrv] incubator-livy-website git commit: [BAHIR-101] Spark SQL datasource for CounchDB/Cloudant

[BAHIR-101] Spark SQL datasource for CounchDB/Cloudant

Initial code supporting CounchDB/Cloudant as an Spark SQL
data source. The initial source contains the core connector,
examples, and basic documentation on the README.

Closes #39.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/commit/f0d9a84f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/tree/f0d9a84f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/diff/f0d9a84f

Branch: refs/heads/master
Commit: f0d9a84f76cb34a432e1d2db053d2471a8ab2ba4
Parents: 2ebfd0b
Author: Yang Lei <ge...@gmail.com>
Authored: Wed Mar 29 17:32:51 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Mar 31 15:42:54 2017 -0700

----------------------------------------------------------------------
 README.md                                       |   1 +
 pom.xml                                         |  17 +-
 sql-cloudant/README.md                          | 324 +++++++++++++++++++
 sql-cloudant/examples/python/CloudantApp.py     |  45 +++
 sql-cloudant/examples/python/CloudantDF.py      |  75 +++++
 .../examples/python/CloudantDFOption.py         |  72 +++++
 .../examples/sql/cloudant/CloudantApp.scala     |  73 +++++
 .../examples/sql/cloudant/CloudantDF.scala      |  64 ++++
 .../sql/cloudant/CloudantDFOption.scala         |  71 ++++
 .../sql/cloudant/CloudantStreaming.scala        |  99 ++++++
 .../cloudant/CloudantStreamingSelector.scala    |  64 ++++
 sql-cloudant/pom.xml                            | 115 +++++++
 .../src/main/resources/application.conf         |  14 +
 sql-cloudant/src/main/resources/reference.conf  |   0
 .../apache/bahir/cloudant/CloudantConfig.scala  | 273 ++++++++++++++++
 .../bahir/cloudant/CloudantReceiver.scala       |  90 ++++++
 .../apache/bahir/cloudant/DefaultSource.scala   | 159 +++++++++
 .../bahir/cloudant/common/FilterUtil.scala      | 149 +++++++++
 .../common/JsonStoreConfigManager.scala         | 212 ++++++++++++
 .../cloudant/common/JsonStoreDataAccess.scala   | 272 ++++++++++++++++
 .../bahir/cloudant/common/JsonStoreRDD.scala    | 106 ++++++
 .../apache/bahir/cloudant/common/JsonUtil.scala |  42 +++
 22 files changed, 2336 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index ff8599b..ebbaea7 100644
--- a/README.md
+++ b/README.md
@@ -52,6 +52,7 @@ Currently, each submodule has its own README.md, with information on example usa
 * [Streaming Mqtt](https://github.com/apache/bahir/blob/master/streaming-mqtt/README.md)
 * [Streaming Zeromq](https://github.com/apache/bahir/blob/master/streaming-zeromq/README.md)
 * [Streaming Twitter](https://github.com/apache/bahir/blob/master/streaming-twitter/README.md)
+* [SQL Cloudant](sql-cloudant/README.md)
 
 Furthermore, to generate scaladocs for each module:
 

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f9ee4a0..73cac1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
   </mailingLists>
 
   <modules>
+    <module>sql-cloudant</module>
     <module>streaming-akka</module>
     <module>streaming-mqtt</module>
     <module>sql-streaming-mqtt</module>
@@ -162,7 +163,7 @@
       </snapshots>
     </pluginRepository>
   </pluginRepositories>
-  
+
   <dependencies>
     <!--
       This is a dummy dependency that is used to trigger the maven-shade plugin so that Spark's
@@ -290,6 +291,12 @@
       </dependency>
 
       <dependency>
+        <groupId>com.typesafe.play</groupId>
+        <artifactId>play-json_${scala.binary.version}</artifactId>
+        <version>2.5.9</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.json4s</groupId>
         <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
         <version>3.2.11</version>
@@ -301,6 +308,12 @@
         <version>${jsr305.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.scalaj</groupId>
+        <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+        <version>2.3.0</version>
+      </dependency>
+
       <!-- Scala Related Dependencies -->
       <dependency>
         <groupId>org.scala-lang</groupId>
@@ -450,6 +463,8 @@
               <exclude>**/README.md</exclude>
               <exclude>**/examples/data/*.txt</exclude>
               <exclude>**/*.iml</exclude>
+              <exclude>**/src/main/resources/application.conf</exclude>
+              <exclude>**/src/main/resources/reference.conf</exclude>
             </excludes>
           </configuration>
         </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
new file mode 100644
index 0000000..98a1c85
--- /dev/null
+++ b/sql-cloudant/README.md
@@ -0,0 +1,324 @@
+Spark Cloudant Connector
+================
+
+Cloudant integration with Spark as Spark SQL external datasource, and Spark Streaming as a custom receiver. 
+
+
+##  Contents:
+0. [Linking](#Linking)
+1. [Implementation of RelationProvider](#implementation-of-relationProvider)
+2. [Implementation of Receiver](#implementation-of-Receiver)
+3. [Sample applications](#Sample-application)
+    1. [Using SQL In Python](#Using-SQL-In-Python)
+    2. [Using SQL In Scala](#Using-SQL-In-Scala)
+    3. [Using DataFrame In Python](#Using-DataFrame-In-Python)
+    4. [Using DataFrame In Scala](#Using-DataFrame-In-Scala)
+    5. [Using Streams In Scala](#Using-Streams-In-Scala)
+4. [Configuration Overview](#Configuration-Overview)
+5. [Known limitations and areas for improvement](#Known-limitations)
+
+
+<div id='Linking'/>
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.2.0-SNAPSHOT"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-cloudant_2.11</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+
+<div id='implementation-of-relationProvider'/>
+
+### Implementation of RelationProvider
+
+[DefaultSource.scala](src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala) is a RelationProvider for loading data from Cloudant to Spark, and saving it back from Cloudant to Spark.  It has the following functionalities:
+
+Functionality | Enablement 
+--- | ---
+Table Option | database or path, search index, view 
+Scan Type | PrunedFilteredScan 
+Column Pruning | yes
+Predicates Push Down | _id or first predicate 
+Parallel Loading | yes, except with search index
+Insert-able | yes
+ 
+
+<div id='implementation-of-Receiver'/>
+
+### Implementation of Receiver
+
+Spark Cloudant connector creates a discretized stream in Spark (Spark input DStream) out of Cloudant data sources. [CloudantReceiver.scala](src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala) is a custom Receiver that converts `_changes` feed from a Cloudant database to DStream in Spark. This allows all sorts of processing on this streamed data including [using DataFrames and SQL operations on it](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala).
+
+**NOTE:** Since CloudantReceiver for Spark Streaming is based on `_changes` API, there are some limitations that application developers should be aware of. Firstly, results returned from `_changes` are partially ordered, and may not be presented in order in which documents were updated. Secondly, in case of shards' unavailability, you may see duplicates, changes that have been seen already. Thus, it is up to applications using Spark Streaming with CloudantReceiver to keep track of _changes they have processed and detect duplicates. 
+
+
+<div id='Sample-application'/>
+
+## Sample applications
+
+<div id='Using-SQL-In-Python'/>
+
+### Using SQL In Python 
+	
+[CloudantApp.py](examples/python/CloudantApp.py)
+
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using temp tables")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .getOrCreate()
+
+
+#### Loading temp table from Cloudant db
+spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+for code in airportData.collect():
+    print code._id
+```	
+
+<div id='Using-SQL-In-Scala'/>
+
+### Using SQL In Scala 
+
+
+[CloudantApp.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala)
+
+```scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .getOrCreate()
+
+// For implicit conversions of Dataframe to RDDs
+import spark.implicits._
+    
+// create a temp table from Cloudant db and query it using sql syntax
+spark.sql(
+    s"""
+    |CREATE TEMPORARY TABLE airportTable
+    |USING org.apache.bahir.cloudant.spark
+    |OPTIONS ( database 'n_airportcodemapping')
+    """.stripMargin)
+// create a dataframe
+val airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+println(s"Total # of rows in airportData: " + airportData.count())
+// convert dataframe to array of Rows, and process each row
+airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println)
+	
+```	
+
+
+<div id='Using-DataFrame-In-Python'/>	
+
+### Using DataFrame In Python 
+
+[CloudantDF.py](examples/python/CloudantDF.py). 
+
+```python	    
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using dataframes")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .getOrCreate()
+
+#### Loading dataframe from Cloudant db
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.cache() 
+df.printSchema()
+df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
+df.filter(df._id >= 'CAA').select("_id",'airportName').show()	    
+```
+	
+In case of doing multiple operations on a dataframe (select, filter etc.),
+you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
+Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`.  Alternatively for large dbs to persist in memory & disk, use: 
+
+```python
+from pyspark import StorageLevel
+df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
+```	
+
+[Sample code on using DataFrame option to define cloudant configuration](examples/python/CloudantDFOption.py)
+
+<div id='Using-DataFrame-In-Scala'/>	
+
+### Using DataFrame In Scala 
+
+[CloudantDF.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala)
+
+```	scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example with Dataframe")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .config("createDBOnSave","true") // to create a db on save
+      .config("jsonstore.rdd.partitions", "20") // using 20 partitions
+      .getOrCreate()
+          
+// 1. Loading data from Cloudant db
+val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
+// Caching df in memory to speed computations
+// and not to retrieve data from cloudant again
+df.cache() 
+df.printSchema()
+
+// 2. Saving dataframe to Cloudant db
+val df2 = df.filter(df("flightSegmentId") === "AA106")
+    .select("flightSegmentId","economyClassBaseCost")
+df2.show()
+df2.write.format("org.apache.bahir.cloudant").save("n_flight2")
+```	
+    
+ [Sample code on using DataFrame option to define cloudant configuration](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala)
+ 
+ 
+<div id='Using-Streams-In-Scala'/>
+ 
+### Using Streams In Scala 
+[CloudantStreaming.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala)
+
+```scala
+val ssc = new StreamingContext(sparkConf, Seconds(10))
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "n_airportcodemapping")))
+
+changes.foreachRDD((rdd: RDD[String], time: Time) => {
+  // Get the singleton instance of SparkSession
+  val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+
+  println(s"========= $time =========")
+  // Convert RDD[String] to DataFrame
+  val changesDataFrame = spark.read.json(rdd)
+  if (!changesDataFrame.schema.isEmpty) {
+    changesDataFrame.printSchema()
+    changesDataFrame.select("*").show()
+    ....
+  }
+})
+ssc.start()
+// run streaming for 120 secs
+Thread.sleep(120000L)
+ssc.stop(true)
+	
+```	
+
+By default, Spark Streaming will load all documents from a database. If you want to limit the loading to specific documents, use `selector` option of `CloudantReceiver` and specify your conditions ([CloudantStreamingSelector.scala](examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala)):
+
+```scala
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "sales",
+  "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}")))
+```
+
+
+<div id='Configuration-Overview'/>
+
+## Configuration Overview	
+
+The configuration is obtained in the following sequence:
+
+1. default in the Config, which is set in the application.conf
+2. key in the SparkConf, which is set in SparkConf
+3. key in the parameters, which is set in a dataframe or temporaty table options, or StreamReceiver
+4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option)
+
+Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys.
+
+
+### Configuration in application.conf
+
+Default values are defined in [here](src/main/resources/application.conf)
+
+### Configuration on SparkConf
+
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.protocol|https|protocol to use to transfer data: http or https
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
+jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
+jsonstore.rdd.minInPartition|10|the min rows in a partition.
+jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds
+bulkSize|200| the bulk save size
+schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs 
+createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. 
+
+
+###  Configuration on Spark SQL Temporary Table or DataFrame
+
+Besides overriding any SparkConf configuration, you can also set the following configurations at temporary table or dataframe level.
+
+Name | Default | Meaning
+--- |:---:| ---
+database||cloudant database name
+view||cloudant view w/o the database name. only used for load.  
+index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.
+path||cloudant: as database name if database is not present
+
+
+#### View Specific
+
+For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compound field. An example of loading data from a view:
+
+```python
+spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
+
+```
+
+###  Configuration on Cloudant Receiver for Spark Streaming
+
+Besides overriding any SparkConf configuration, you can also set the following configurations at stream Receiver level
+
+Name | Default | Meaning
+--- |:---:| ---
+database||cloudant database name
+selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark.
+
+
+<div id='Known-limitations'/>
+
+## Known limitations and areas for improvement
+
+* Loading data from Cloudant search index will work only for up to 200 results.
+		
+* Need to improve how number of partitions is determined for parallel loading

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/python/CloudantApp.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantApp.py b/sql-cloudant/examples/python/CloudantApp.py
new file mode 100644
index 0000000..029f39b
--- /dev/null
+++ b/sql-cloudant/examples/python/CloudantApp.py
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+from pyspark.sql import SparkSession
+
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using temp tables")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .getOrCreate()
+
+
+# ***1. Loading temp table from Cloudant db
+spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+for code in airportData.collect():
+    print code._id
+
+
+# ***2. Loading temp table from Cloudant search index
+print 'About to test org.apache.bahir.cloudant for flight with index'
+spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', index '_design/view/_search/n_flights')")
+flightData = spark.sql("SELECT flightSegmentId, scheduledDepartureTime FROM flightTable1 WHERE flightSegmentId >'AA9' AND flightSegmentId<'AA95'")
+flightData.printSchema()
+for code in flightData.collect():
+    print 'Flight {0} on {1}'.format(code.flightSegmentId, code.scheduledDepartureTime)
+

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/python/CloudantDF.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantDF.py b/sql-cloudant/examples/python/CloudantDF.py
new file mode 100644
index 0000000..c009e98
--- /dev/null
+++ b/sql-cloudant/examples/python/CloudantDF.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+from pyspark.sql import SparkSession
+
+# define cloudant related configuration
+# set protocol to http if needed, default value=https
+# config("cloudant.protocol","http")
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using dataframes")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .getOrCreate()
+
+
+# ***1. Loading dataframe from Cloudant db
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+# In case of doing multiple operations on a dataframe (select, filter etc.)
+# you should persist the dataframe.
+# Othewise, every operation on the dataframe will load the same data from Cloudant again.
+# Persisting will also speed up computation.
+df.cache() # persisting in memory
+# alternatively for large dbs to persist in memory & disk:
+# from pyspark import StorageLevel
+# df.persist(storageLevel = StorageLevel(True, True, False, True, 1)) 
+df.printSchema()
+df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
+df.filter(df._id >= 'CAA').select("_id",'airportName').show()
+
+
+# ***2. Saving a datafram to Cloudant db
+df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight")
+df.printSchema()
+df2 = df.filter(df.flightSegmentId=='AA106')\
+    .select("flightSegmentId", "economyClassBaseCost")
+df2.write.save("n_flight2",  "org.apache.bahir.cloudant",
+        bulkSize = "100", createDBOnSave="true") 
+total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", 
+        "scheduledDepartureTime").orderBy(df.flightSegmentId).count()
+print "Total", total, "flights from table"
+
+
+# ***3. Loading dataframe from a Cloudant search index
+df = spark.read.load(format="org.apache.bahir.cloudant", database="n_flight", 
+        index="_design/view/_search/n_flights")
+df.printSchema()
+total = df.filter(df.flightSegmentId >'AA9').select("flightSegmentId", 
+        "scheduledDepartureTime").orderBy(df.flightSegmentId).count()
+print "Total", total, "flights from index"
+
+
+# ***4. Loading dataframe from a Cloudant view
+df = spark.read.load(format="org.apache.bahir.cloudant", path="n_flight", 
+        view="_design/view/_view/AA0", schemaSampleSize="20")
+# schema for view will always be: _id, key, value
+# where value can be a complex field
+df.printSchema()
+df.show()

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/python/CloudantDFOption.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantDFOption.py b/sql-cloudant/examples/python/CloudantDFOption.py
new file mode 100644
index 0000000..c045532
--- /dev/null
+++ b/sql-cloudant/examples/python/CloudantDFOption.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+from pyspark.sql import SparkSession
+
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using dataframes with options")\
+    .getOrCreate()
+
+cloudant_host = "ACCOUNT.cloudant.com"
+cloudant_username = "USERNAME"
+cloudant_password = "PASSWORD"
+
+# ***1. Loading dataframe from Cloudant db
+df = spark.read.format("org.apache.bahir.cloudant") \
+    .option("cloudant.host", cloudant_host) \
+    .option("cloudant.username", cloudant_username) \
+    .option("cloudant.password", cloudant_password) \
+    .load("n_airportcodemapping")
+df.cache() # persisting in memory
+df.printSchema()
+df.filter(df._id >= 'CAA').select("_id",'airportName').show()
+
+
+# ***2.Saving dataframe to Cloudant db
+df.filter(df._id >= 'CAA').select("_id",'airportName') \
+    .write.format("org.apache.bahir.cloudant") \
+    .option("cloudant.host", cloudant_host) \
+    .option("cloudant.username", cloudant_username) \
+    .option("cloudant.password",cloudant_password) \
+    .option("bulkSize","100") \
+    .option("createDBOnSave", "true") \
+    .save("airportcodemapping_df")
+df = spark.read.format("org.apache.bahir.cloudant") \
+    .option("cloudant.host", cloudant_host) \
+    .option("cloudant.username", cloudant_username) \
+    .option("cloudant.password", cloudant_password) \
+    .load("n_flight")
+df.printSchema()
+total = df.filter(df.flightSegmentId >'AA9') \
+    .select("flightSegmentId", "scheduledDepartureTime") \
+    .orderBy(df.flightSegmentId).count()
+print "Total", total, "flights from table"
+
+
+# ***3. Loading dataframe from Cloudant search index
+df = spark.read.format("org.apache.bahir.cloudant") \
+    .option("cloudant.host",cloudant_host) \
+    .option("cloudant.username",cloudant_username) \
+    .option("cloudant.password",cloudant_password) \
+    .option("index","_design/view/_search/n_flights").load("n_flight")
+df.printSchema()
+
+total = df.filter(df.flightSegmentId >'AA9') \
+    .select("flightSegmentId", "scheduledDepartureTime") \
+    .orderBy(df.flightSegmentId).count()
+print "Total", total, "flights from index"

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala
new file mode 100644
index 0000000..d3e5ecc
--- /dev/null
+++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantApp.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql.cloudant
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.SQLContext
+
+object CloudantApp {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example")
+      .config("cloudant.host", "ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password", "PASSWORD")
+      .getOrCreate()
+
+    // For implicit conversions of Dataframe to RDDs
+    import spark.implicits._
+
+    // create a temp table from Cloudant db and query it using sql syntax
+    spark.sql(
+        s"""
+        |CREATE TEMPORARY TABLE airportTable
+        |USING org.apache.bahir.cloudant
+        |OPTIONS ( database 'n_airportcodemapping')
+        """.stripMargin)
+    // create a dataframe
+    val airportData = spark.sql(
+        s"""
+        |SELECT _id, airportName
+        |FROM airportTable
+        |WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id
+        """.stripMargin)
+    airportData.printSchema()
+    println(s"Total # of rows in airportData: " + airportData.count()) // scalastyle:ignore
+    // convert dataframe to array of Rows, and process each row
+    airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println) // scalastyle:ignore
+
+    // create a temp table from Cloudant index  and query it using sql syntax
+    spark.sql(
+        s"""
+        |CREATE TEMPORARY TABLE flightTable
+        |USING org.apache.bahir.cloudant
+        |OPTIONS (database 'n_flight', index '_design/view/_search/n_flights')
+        """.stripMargin)
+    val flightData = spark.sql(
+        s"""
+        |SELECT flightSegmentId, scheduledDepartureTime
+        |FROM flightTable
+        |WHERE flightSegmentId >'AA9' AND flightSegmentId<'AA95'
+        """.stripMargin)
+    flightData.printSchema()
+    flightData.map(t => "flightSegmentId: " + t(0) + ", scheduledDepartureTime: " + t(1))
+                   .collect().foreach(println) // scalastyle:ignore
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala
new file mode 100644
index 0000000..d97b688
--- /dev/null
+++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDF.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql.cloudant
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.storage.StorageLevel
+
+object CloudantDF{
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example with Dataframe")
+      .config("cloudant.host", "ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password", "PASSWORD")
+      .config("createDBOnSave", "true") // to create a db on save
+      .config("jsonstore.rdd.partitions", "20") // using 20 partitions
+      .getOrCreate()
+
+    // 1. Loading data from Cloudant db
+    val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
+    // Caching df in memory to speed computations
+    // and not to retrieve data from cloudant again
+    df.cache()
+    df.printSchema()
+
+    // 2. Saving dataframe to Cloudant db
+    val df2 = df.filter(df("flightSegmentId") === "AA106")
+        .select("flightSegmentId", "economyClassBaseCost")
+    df2.show()
+    df2.write.format("org.apache.bahir.cloudant").save("n_flight2")
+
+    // 3. Loading data from Cloudant search index
+    val df3 = spark.read.format("org.apache.bahir.cloudant")
+      .option("index", "_design/view/_search/n_flights").load("n_flight")
+    val total = df3.filter(df3("flightSegmentId") >"AA9")
+      .select("flightSegmentId", "scheduledDepartureTime")
+      .orderBy(df3("flightSegmentId")).count()
+    println(s"Total $total flights from index") // scalastyle:ignore
+
+    // 4. Loading data from view
+    val df4 = spark.read.format("org.apache.bahir.cloudant")
+      .option("view", "_design/view/_view/AA0").load("n_flight")
+    df4.printSchema()
+    df4.show()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala
new file mode 100644
index 0000000..164ca21
--- /dev/null
+++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantDFOption.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql.cloudant
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.storage.StorageLevel
+
+object CloudantDFOption{
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example with Dataframe using Option")
+      .getOrCreate()
+
+    val cloudantHost = "ACCOUNT.cloudant.com"
+    val cloudantUser = "USERNAME"
+    val cloudantPassword = "PASSWORD"
+
+    // 1. Loading data from Cloudant db
+    val df = spark.read.format("org.apache.bahir.cloudant")
+      .option("cloudant.host", cloudantHost)
+      .option("cloudant.username", cloudantUser)
+      .option("cloudant.password", cloudantPassword)
+      .load("n_airportcodemapping")
+
+    df.cache()
+    df.printSchema()
+    df.filter(df("_id") >= "CAA").select("_id", "airportName").show()
+
+    // 2. Saving dataframe to Cloudant db
+    // To create a Cloudant db during save set the option createDBOnSave=true
+    df.filter(df("_id") >= "CAA")
+      .select("_id", "airportName")
+      .write.format("org.apache.bahir.cloudant")
+      .option("cloudant.host", cloudantHost)
+      .option("cloudant.username", cloudantUser)
+      .option("cloudant.password", cloudantPassword)
+      .option("createDBOnSave", "true")
+      .save("airportcodemapping_df")
+
+    // 3. Loading data from Cloudant search index
+    val df2 = spark.read.format("org.apache.bahir.cloudant")
+      .option("index", "_design/view/_search/n_flights")
+      .option("cloudant.host", cloudantHost)
+      .option("cloudant.username", cloudantUser)
+      .option("cloudant.password", cloudantPassword)
+      .load("n_flight")
+    val total2 = df2.filter(df2("flightSegmentId") >"AA9")
+      .select("flightSegmentId", "scheduledDepartureTime")
+      .orderBy(df2("flightSegmentId"))
+      .count()
+    println(s"Total $total2 flights from index")// scalastyle:ignore
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
new file mode 100644
index 0000000..a1de696
--- /dev/null
+++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql.cloudant
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
+import org.apache.spark.streaming.scheduler.{ StreamingListener, StreamingListenerReceiverError}
+
+import org.apache.bahir.cloudant.CloudantReceiver
+
+object CloudantStreaming {
+  def main(args: Array[String]) {
+    val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala")
+    // Create the context with a 10 seconds batch size
+    val ssc = new StreamingContext(sparkConf, Seconds(10))
+
+    val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map(
+      "cloudant.host" -> "ACCOUNT.cloudant.com",
+      "cloudant.username" -> "USERNAME",
+      "cloudant.password" -> "PASSWORD",
+      "database" -> "n_airportcodemapping")))
+
+    changes.foreachRDD((rdd: RDD[String], time: Time) => {
+      // Get the singleton instance of SparkSession
+      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+
+      println(s"========= $time =========")// scalastyle:ignore
+      // Convert RDD[String] to DataFrame
+      val changesDataFrame = spark.read.json(rdd)
+      if (!changesDataFrame.schema.isEmpty) {
+        changesDataFrame.printSchema()
+        changesDataFrame.select("*").show()
+
+        var hasDelRecord = false
+        var hasAirportNameField = false
+        for (field <- changesDataFrame.schema.fieldNames) {
+          if ("_deleted".equals(field)) {
+            hasDelRecord = true
+          }
+          if ("airportName".equals(field)) {
+            hasAirportNameField = true
+          }
+        }
+        if (hasDelRecord) {
+          changesDataFrame.filter(changesDataFrame("_deleted")).select("*").show()
+        }
+
+        if (hasAirportNameField) {
+          changesDataFrame.filter(changesDataFrame("airportName") >= "Paris").select("*").show()
+          changesDataFrame.registerTempTable("airportcodemapping")
+          val airportCountsDataFrame =
+            spark.sql(
+                s"""
+                |select airportName, count(*) as total
+                |from airportcodemapping
+                |group by airportName
+                """.stripMargin)
+          airportCountsDataFrame.show()
+        }
+      }
+
+    })
+    ssc.start()
+    // run streaming for 120 secs
+    Thread.sleep(120000L)
+    ssc.stop(true)
+  }
+}
+
+/** Lazily instantiated singleton instance of SparkSession */
+object SparkSessionSingleton {
+  @transient  private var instance: SparkSession = _
+  def getInstance(sparkConf: SparkConf): SparkSession = {
+    if (instance == null) {
+      instance = SparkSession
+        .builder
+        .config(sparkConf)
+        .getOrCreate()
+    }
+    instance
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
new file mode 100644
index 0000000..51d939a
--- /dev/null
+++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.sql.cloudant
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
+
+import org.apache.bahir.cloudant.CloudantReceiver
+
+object CloudantStreamingSelector {
+  def main(args: Array[String]) {
+    val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala")
+
+    // Create the context with a 10 seconds batch size
+    val ssc = new StreamingContext(sparkConf, Seconds(10))
+    val curTotalAmount = new AtomicLong(0)
+    val curSalesCount = new AtomicLong(0)
+    var batchAmount = 0L
+
+    val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map(
+      "cloudant.host" -> "ACCOUNT.cloudant.com",
+      "cloudant.username" -> "USERNAME",
+      "cloudant.password" -> "PASSWORD",
+      "database" -> "sales",
+      "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}")))
+
+    changes.foreachRDD((rdd: RDD[String], time: Time) => {
+      // Get the singleton instance of SQLContext
+      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+      println(s"========= $time =========") // scalastyle:ignore
+      val changesDataFrame = spark.read.json(rdd)
+      if (!changesDataFrame.schema.isEmpty) {
+        changesDataFrame.select("*").show()
+        batchAmount = changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0)
+        curSalesCount.getAndAdd(changesDataFrame.count())
+        curTotalAmount.getAndAdd(batchAmount)
+        println("Current sales count:" + curSalesCount)// scalastyle:ignore
+        println("Current total amount:" + curTotalAmount)// scalastyle:ignore
+        }
+    })
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/pom.xml
----------------------------------------------------------------------
diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml
new file mode 100644
index 0000000..5860033
--- /dev/null
+++ b/sql-cloudant/pom.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-parent_2.11</artifactId>
+    <version>2.2.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>spark-sql-cloudant_2.11</artifactId>
+  <properties>
+    <sbt.project.name>spark-sql-cloudant</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Apache Bahir - Spark SQL Cloudant DataSource</name>
+  <url>http://bahir.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.typesafe.play</groupId>
+      <artifactId>play-json_${scala.binary.version}</artifactId>
+      <!--version>2.4.8</version-->
+    </dependency>
+    <!--dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.5.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>2.5.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.5.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.datatype</groupId>
+      <artifactId>jackson-datatype-jdk8</artifactId>
+      <version>2.5.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.datatype</groupId>
+      <artifactId>jackson-datatype-jsr310</artifactId>
+      <version>2.5.4</version>
+    </dependency-->
+    <dependency>
+      <groupId>org.scalaj</groupId>
+      <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+	 <resources>
+	     <resource>
+    	   <directory>src/main/resources</directory>
+     	</resource>
+    </resources>    
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf
new file mode 100644
index 0000000..2d8b236
--- /dev/null
+++ b/sql-cloudant/src/main/resources/application.conf
@@ -0,0 +1,14 @@
+spark-sql {
+    bulkSize = 200
+    schemaSampleSize = -1
+    createDBOnSave = false
+    jsonstore.rdd = {
+        partitions = 10
+        maxInPartition = -1
+        minInPartition = 10
+        requestTimeout = 900000
+    }
+    cloudant = {
+        protocol = https
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/reference.conf b/sql-cloudant/src/main/resources/reference.conf
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
new file mode 100644
index 0000000..ac14f4b
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant
+
+import java.net.URLEncoder
+
+import play.api.libs.json.JsArray
+import play.api.libs.json.Json
+import play.api.libs.json.JsValue
+
+import org.apache.bahir.cloudant.common._
+
+/*
+* Only allow one field pushdown now
+* as the filter today does not tell how to link the filters out And v.s. Or
+*/
+
+class CloudantConfig(val protocol: String, val host: String,
+    val dbName: String, val indexName: String = null, val viewName: String = null)
+    (implicit val username: String, val password: String,
+    val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
+    val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
+    val createDBOnSave: Boolean, val selector: String)
+    extends Serializable{
+
+  private val SCHEMA_FOR_ALL_DOCS_NUM = -1
+  private lazy val dbUrl = {protocol + "://" + host + "/" + dbName}
+
+  val pkField = "_id"
+  val defaultIndex = "_all_docs" // "_changes" does not work for partition
+  val default_filter: String = "*:*"
+
+  def getChangesUrl(): String = {
+    dbUrl + "/_changes?include_docs=true&feed=normal"
+  }
+
+  def getContinuousChangesUrl(): String = {
+    var url = dbUrl + "/_changes?include_docs=true&feed=continuous&heartbeat=3000"
+    if (selector != null) {
+      url = url + "&filter=_selector"
+    }
+    url
+  }
+
+  def getSelector() : String = {
+    selector
+  }
+
+  def getDbUrl(): String = {
+    dbUrl
+  }
+
+  def getLastUrl(skip: Int): String = {
+    if (skip ==0 ) null
+    else s"$dbUrl/$defaultIndex?limit=$skip"
+  }
+
+  def getSchemaSampleSize(): Int = {
+    schemaSampleSize
+  }
+
+  def getCreateDBonSave(): Boolean = {
+    createDBOnSave
+  }
+
+  def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
+
+  def getTotalUrl(url: String): String = {
+    if (url.contains('?')) {
+      url + "&limit=1"
+    } else {
+      url + "?limit=1"
+    }
+  }
+
+  def getDbname(): String = {
+    dbName
+  }
+
+  def allowPartition(): Boolean = {indexName==null}
+
+  def getOneUrl(): String = {
+    dbUrl + "/_all_docs?limit=1&include_docs=true"
+  }
+
+  def getOneUrlExcludeDDoc1(): String = {
+    dbUrl + "/_all_docs?endkey=%22_design/%22&limit=1&include_docs=true"
+  }
+
+  def getOneUrlExcludeDDoc2(): String = {
+    dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=1&include_docs=true"
+  }
+
+  def getAllDocsUrlExcludeDDoc(limit: Int): String = {
+    if (viewName == null) {
+      dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=" + limit + "&include_docs=true"
+    } else {
+      dbUrl + "/" + viewName + "?limit=1"
+    }
+  }
+
+  def getAllDocsUrl(limit: Int): String = {
+    if (viewName == null) {
+      if (limit == SCHEMA_FOR_ALL_DOCS_NUM) {
+        dbUrl + "/_all_docs?include_docs=true"
+      } else {
+        dbUrl + "/_all_docs?limit=" + limit + "&include_docs=true"
+      }
+    } else {
+      if (limit == JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM) {
+        dbUrl + "/" + viewName
+      } else {
+        dbUrl + "/" + viewName + "?limit=" + limit
+      }
+    }
+  }
+
+  def getRangeUrl(field: String = null, start: Any = null,
+      startInclusive: Boolean = false, end: Any = null,
+      endInclusive: Boolean = false,
+      includeDoc: Boolean = true): (String, Boolean) = {
+    val (url: String, pusheddown: Boolean) =
+      calculate(field, start, startInclusive, end, endInclusive)
+    if (includeDoc) {
+      if (url.indexOf('?') > 0) {
+        (url + "&include_docs=true", pusheddown)
+      } else {
+        (url + "?include_docs=true", pusheddown)
+      }
+    } else {
+      (url, pusheddown)
+    }
+  }
+
+  private def calculate(field: String, start: Any, startInclusive: Boolean,
+      end: Any, endInclusive: Boolean): (String, Boolean) = {
+    if (field != null && field.equals(pkField)) {
+      var condition = ""
+      if (start != null && end != null && start.equals(end)) {
+        condition += "?key=%22" + URLEncoder.encode(start.toString(), "UTF-8") + "%22"
+      } else {
+        if (start != null) {
+          condition += "?startkey=%22" + URLEncoder.encode(
+              start.toString(), "UTF-8") + "%22"
+        }
+        if (end != null) {
+          if (start != null) {
+            condition += "&"
+          } else {
+            condition += "?"
+          }
+          condition += "endkey=%22" + URLEncoder.encode(end.toString(), "UTF-8") + "%22"
+        }
+      }
+      (dbUrl + "/_all_docs" + condition, true)
+    } else if (indexName!=null) {
+      //  push down to indexName
+      val condition = calculateCondition(field, start, startInclusive,
+        end, endInclusive)
+      (dbUrl + "/" + indexName + "?q=" + condition, true)
+    } else if (viewName != null) {
+      (dbUrl + "/" + viewName, true)
+    } else {
+      (s"$dbUrl/$defaultIndex", false)
+    }
+
+  }
+
+  def calculateCondition(field: String, min: Any, minInclusive: Boolean = false,
+        max: Any, maxInclusive: Boolean = false) : String = {
+    if (field != null && (min != null || max!= null)) {
+      var condition = field + ":"
+      if (min!=null && max!=null && min.equals(max)) {
+        condition += min
+      } else {
+        if (minInclusive) {
+          condition+="["
+        } else {
+          condition +="{"
+        }
+        if (min!=null) {
+          condition += min
+        } else {
+          condition+="*"
+        }
+        condition+=" TO "
+        if (max !=null) {
+          condition += max
+        } else {
+          condition += "*"
+        }
+        if (maxInclusive) {
+          condition+="]"
+        } else {
+          condition +="}"
+        }
+      }
+      URLEncoder.encode(condition, "UTF-8")
+    } else {
+      default_filter
+    }
+  }
+
+  def getSubSetUrl (url: String, skip: Int, limit: Int)
+      (implicit convertSkip: (Int) => String): String = {
+    val suffix = {
+      if (url.indexOf("_all_docs")>0) "include_docs=true&limit=" +
+        limit + "&skip=" + skip
+      else if (url.indexOf("_changes")>0) "include_docs=true&limit=" +
+          limit + "&since=" + convertSkip(skip)
+      else if (viewName != null) {
+        "limit=" + limit + "&skip=" + skip
+      } else {
+        "include_docs=true&limit=" + limit
+      } // TODO Index query does not support subset query. Should disable Partitioned loading?
+    }
+    if (url.indexOf('?') > 0) {
+      url + "&" + suffix
+    }
+    else {
+      url + "?" + suffix
+    }
+  }
+
+  def getTotalRows(result: JsValue): Int = {
+    val tr = (result \ "total_rows").asOpt[Int]
+    tr match {
+      case None =>
+        (result \ "pending").as[Int] + 1
+      case Some(tr2) =>
+        tr2
+    }
+  }
+
+  def getRows(result: JsValue): Seq[JsValue] = {
+    if (viewName == null) {
+      ((result \ "rows").as[JsArray]).value.map(row => (row \ "doc").get)
+    } else {
+      ((result \ "rows").as[JsArray]).value.map(row => row)
+    }
+  }
+
+  def getBulkPostUrl(): String = {
+    dbUrl + "/_bulk_docs"
+  }
+
+  def getBulkRows(rows: List[String]): String = {
+    val docs = rows.map { x => Json.parse(x) }
+    Json.stringify(Json.obj("docs" -> Json.toJson(docs.toSeq)))
+  }
+
+  def getConflictErrStr(): String = {
+    """"error":"conflict""""
+  }
+
+  def getForbiddenErrStr(): String = {
+    """"error":"forbidden""""
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
new file mode 100644
index 0000000..0446660
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant
+
+// scalastyle:off
+import scalaj.http._
+
+import play.api.libs.json.Json
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.SparkConf
+
+import org.apache.bahir.cloudant.common._
+// scalastyle:on
+
+class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String])
+    extends Receiver[String](StorageLevel.MEMORY_AND_DISK) {
+  lazy val config: CloudantConfig = {
+    JsonStoreConfigManager.getConfig(sparkConf, cloudantParams)
+      .asInstanceOf[CloudantConfig]
+  }
+
+  def onStart() {
+    // Start the thread that receives data over a connection
+    new Thread("Cloudant Receiver") {
+      override def run() { receive() }
+    }.start()
+  }
+
+  private def receive(): Unit = {
+    val url = config.getContinuousChangesUrl()
+    val selector: String = if (config.getSelector() != null) {
+      "{\"selector\":" + config.getSelector() + "}"
+    } else {
+      "{}"
+    }
+
+    val clRequest: HttpRequest = config.username match {
+      case null =>
+        Http(url)
+          .postData(selector)
+          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
+          .header("Content-Type", "application/json")
+          .header("User-Agent", "spark-cloudant")
+      case _ =>
+        Http(url)
+          .postData(selector)
+          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
+          .header("Content-Type", "application/json")
+          .header("User-Agent", "spark-cloudant")
+          .auth(config.username, config.password)
+    }
+
+    clRequest.exec((code, headers, is) => {
+      if (code == 200) {
+        scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => {
+          if (line.length() > 0) {
+            val json = Json.parse(line)
+            val jsonDoc = (json \ "doc").get
+            val doc = Json.stringify(jsonDoc)
+            store(doc)
+          }
+        })
+      } else {
+        val status = headers.getOrElse("Status", IndexedSeq.empty)
+        val errorMsg = "Error retrieving _changes feed " + config.getDbname() + ": " + status(0)
+        reportError(errorMsg, new RuntimeException(errorMsg))
+      }
+    })
+  }
+
+  def onStop(): Unit = {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
new file mode 100644
index 0000000..4c973f7
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant
+
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import org.apache.bahir.cloudant.common.{FilterInterpreter, JsonStoreDataAccess, JsonStoreRDD, _}
+
+case class CloudantReadWriteRelation (config: CloudantConfig,
+                                      schema: StructType,
+                                      allDocsDF: DataFrame = null)
+                      (@transient val sqlContext: SQLContext)
+  extends BaseRelation with PrunedFilteredScan  with InsertableRelation {
+
+   @transient lazy val dataAccess = {new JsonStoreDataAccess(config)}
+
+    implicit lazy val logger = LoggerFactory.getLogger(getClass)
+
+    def buildScan(requiredColumns: Array[String],
+                filters: Array[Filter]): RDD[Row] = {
+      val colsLength = requiredColumns.length
+
+      if (allDocsDF != null) {
+        if (colsLength == 0) {
+          allDocsDF.select().rdd
+        } else if (colsLength == 1) {
+          allDocsDF.select(requiredColumns(0)).rdd
+        } else {
+          val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
+          allDocsDF.select(requiredColumns(0), colsExceptCol0: _*).rdd
+        }
+      } else {
+        val filterInterpreter = new FilterInterpreter(filters)
+        var searchField: String = {
+          if (filterInterpreter.containsFiltersFor(config.pkField)) {
+            config.pkField
+          } else {
+            filterInterpreter.firstField
+          }
+        }
+
+        val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField)
+        implicit val columns = requiredColumns
+        val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField,
+            min, minInclusive, max, maxInclusive, false)
+        if (!pusheddown) searchField = null
+        implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField)
+
+        val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url)
+        val df = sqlContext.read.json(cloudantRDD)
+        if (colsLength > 1) {
+          val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
+          df.select(requiredColumns(0), colsExceptCol0: _*).rdd
+        } else {
+          df.rdd
+        }
+      }
+    }
+
+
+  def insert(data: DataFrame, overwrite: Boolean): Unit = {
+      if (config.getCreateDBonSave()) {
+        dataAccess.createDB()
+      }
+      if (data.count() == 0) {
+        logger.warn(("Database " + config.getDbname() +
+          ": nothing was saved because the number of records was 0!"))
+      } else {
+        val result = data.toJSON.foreachPartition { x =>
+          val list = x.toList // Has to pass as List, Iterator results in 0 data
+          dataAccess.saveAll(list)
+        }
+      }
+    }
+}
+
+class DefaultSource extends RelationProvider
+  with CreatableRelationProvider
+  with SchemaRelationProvider {
+
+  val logger = LoggerFactory.getLogger(getClass)
+
+  def createRelation(sqlContext: SQLContext,
+                     parameters: Map[String, String]): CloudantReadWriteRelation = {
+      create(sqlContext, parameters, null)
+    }
+
+    private def create(sqlContext: SQLContext,
+                       parameters: Map[String, String],
+                       inSchema: StructType) = {
+
+      val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters)
+
+      var allDocsDF: DataFrame = null
+
+      val schema: StructType = {
+        if (inSchema != null) {
+          inSchema
+        } else {
+          val df = if (config.getSchemaSampleSize() ==
+            JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM &&
+            config.viewName == null
+            && config.indexName == null) {
+            val filterInterpreter = new FilterInterpreter(null)
+            var searchField = null
+            val (min, minInclusive, max, maxInclusive) =
+                filterInterpreter.getInfo(searchField)
+            val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField,
+                min, minInclusive, max, maxInclusive, false)
+            val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url)
+            allDocsDF = sqlContext.read.json(cloudantRDD)
+            allDocsDF
+          } else {
+            val dataAccess = new JsonStoreDataAccess(config)
+            val aRDD = sqlContext.sparkContext.parallelize(
+                dataAccess.getMany(config.getSchemaSampleSize()))
+            sqlContext.read.json(aRDD)
+          }
+          df.schema
+        }
+      }
+      CloudantReadWriteRelation(config, schema, allDocsDF)(sqlContext)
+    }
+
+    def createRelation(sqlContext: SQLContext,
+                       mode: SaveMode,
+                       parameters: Map[String, String],
+                       data: DataFrame): CloudantReadWriteRelation = {
+      val relation = create(sqlContext, parameters, data.schema)
+      relation.insert(data, mode==SaveMode.Overwrite)
+      relation
+    }
+
+    def createRelation(sqlContext: SQLContext,
+                       parameters: Map[String, String],
+                       schema: StructType): CloudantReadWriteRelation = {
+      create(sqlContext, parameters, schema)
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
new file mode 100644
index 0000000..12cd81c
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.common
+
+import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsString, JsValue}
+
+import org.apache.spark.sql.sources._
+
+
+/**
+ * Only handles the following filter condition
+ * 1. EqualTo,GreaterThan,LessThan,GreaterThanOrEqual,LessThanOrEqual,In
+ * 2. recursive AND of (filters in 1 and AND). Issue: Spark 1.3.0 does not return
+ *    AND filter instead returned 2 filters
+ */
+class FilterInterpreter(origFilters: Array[Filter]) {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  lazy val firstField = {
+    if (origFilters.length > 0) getFilterAttribute(origFilters(0))
+    else null
+  }
+
+  private lazy val filtersByAttr = {
+    origFilters
+      .filter(f => getFilterAttribute(f) != null)
+      .map(f => (getFilterAttribute(f), f))
+      .groupBy(attrFilter => attrFilter._1)
+      .mapValues(a => a.map(p => p._2))
+  }
+
+  private def getFilterAttribute(f: Filter): String = {
+    val result = f match {
+      case EqualTo(attr, v) => attr
+      case GreaterThan(attr, v) => attr
+      case LessThan(attr, v) => attr
+      case GreaterThanOrEqual(attr, v) => attr
+      case LessThanOrEqual(attr, v) => attr
+      case In(attr, v) => attr
+      case IsNotNull(attr) => attr
+      case IsNull(attr) => attr
+      case _ => null
+    }
+    result
+  }
+
+  def containsFiltersFor(key: String): Boolean = {
+    filtersByAttr.contains(key)
+  }
+
+  private lazy val analyzedFilters = {
+    filtersByAttr.map(m => m._1 -> analyze(m._2))
+  }
+
+  private def analyze(filters: Array[Filter]): (Any, Boolean, Any, Boolean, Array[Filter]) = {
+
+    var min: Any = null
+    var minInclusive: Boolean = false
+    var max: Any = null
+    var maxInclusive: Boolean = false
+    var others: Array[Filter] = Array[Filter]()
+
+    def evaluate(filter: Filter) {
+      filter match {
+        case GreaterThanOrEqual(attr, v) => min = v; minInclusive = true
+        case LessThanOrEqual(attr, v) => max = v; maxInclusive = true
+        case EqualTo(attr, v) => min = v; max = v
+        case GreaterThan(attr, v) => min = v
+        case LessThan(attr, v) => max = v
+        case _ => others = others :+ filter
+      }
+    }
+
+    filters.map(f => evaluate(f))
+
+    logger.info(s"Calculated range info: min=$min," +
+      s" minInclusive=$minInclusive," +
+      s"max=$max," +
+      s"maxInclusive=$maxInclusive," +
+      s"others=$others")
+    (min, minInclusive, max, maxInclusive, others)
+  }
+
+  def getInfo(field: String): (Any, Boolean, Any, Boolean) = {
+    if (field == null) (null, false, null, false)
+    else {
+      val data = analyzedFilters.getOrElse(field, (null, false, null, false, null))
+      (data._1, data._2, data._3, data._4)
+    }
+  }
+
+  def getFiltersForPostProcess(pushdownField: String): Map[String, Array[Filter]] = {
+    filtersByAttr.map(f => {
+      if (f._1.equals(pushdownField)) f._1 -> analyzedFilters.get(pushdownField).get._5
+      else f._1 -> f._2
+    })
+  }
+}
+
+/**
+ *
+ */
+class FilterUtil(filters: Map[String, Array[Filter]]) {
+  private val logger = LoggerFactory.getLogger(getClass)
+  def apply(implicit r: JsValue = null): Boolean = {
+    if (r == null) return true
+    val satisfied = filters.forall({
+      case (attr, filters) =>
+        val field = JsonUtil.getField(r, attr).getOrElse(null)
+        if (field == null) {
+          logger.debug(s"field $attr not exisit:$r")
+          false
+        } else {
+          true
+        }
+    })
+    satisfied
+  }
+}
+
+
+object FilterDDocs {
+  def filter(row: JsValue): Boolean = {
+    if (row == null) return true
+    val id : String = JsonUtil.getField(row, "_id").
+        getOrElse(null).as[JsString].value
+    if (id.startsWith("_design")) {
+      false
+    } else {
+      true
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
new file mode 100644
index 0000000..92192bb
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.common
+
+import com.typesafe.config.ConfigFactory
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.SparkConf
+
+import org.apache.bahir.cloudant.CloudantConfig
+
+ object JsonStoreConfigManager {
+  val CLOUDANT_CONNECTOR_VERSION = "2.0.0"
+  val SCHEMA_FOR_ALL_DOCS_NUM = -1
+
+  private val CLOUDANT_HOST_CONFIG = "cloudant.host"
+  private val CLOUDANT_USERNAME_CONFIG = "cloudant.username"
+  private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password"
+  private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol"
+
+
+  private val PARTITION_CONFIG = "jsonstore.rdd.partitions"
+  private val MAX_IN_PARTITION_CONFIG = "jsonstore.rdd.maxInPartition"
+  private val MIN_IN_PARTITION_CONFIG = "jsonstore.rdd.minInPartition"
+  private val REQUEST_TIMEOUT_CONFIG = "jsonstore.rdd.requestTimeout"
+  private val BULK_SIZE_CONFIG = "bulkSize"
+  private val SCHEMA_SAMPLE_SIZE_CONFIG = "schemaSampleSize"
+  private val CREATE_DB_ON_SAVE = "createDBOnSave"
+
+
+  private val configFactory = ConfigFactory.load()
+
+  private val ROOT_CONFIG_NAME = "spark-sql"
+  private val rootConfig = configFactory.getConfig(ROOT_CONFIG_NAME)
+
+
+  /**
+   * The sequence of getting configuration
+   * 1. "spark."+key in the SparkConf
+   *  (as they are treated as the one passed in through spark-submit)
+   * 2. key in the parameters, which is set in DF option
+   * 3. key in the SparkConf, which is set in SparkConf
+   * 4. default in the Config, which is set in the application.conf
+   */
+
+
+  private def getInt(sparkConf: SparkConf, parameters: Map[String, String],
+      key: String) : Int = {
+    val valueS = parameters.getOrElse(key, null)
+    if (sparkConf != null) {
+      val default = {
+        if (valueS == null) {
+          sparkConf.getInt(key, rootConfig.getInt(key))
+        } else {
+          valueS.toInt
+        }
+      }
+      sparkConf.getInt(s"spark.$key", default)
+    } else {
+      if (valueS == null) {
+        rootConfig.getInt(key)
+      } else {
+        valueS.toInt
+      }
+    }
+  }
+
+  private def getLong(sparkConf: SparkConf, parameters: Map[String, String],
+      key: String) : Long = {
+    val valueS = parameters.getOrElse(key, null)
+    if (sparkConf != null) {
+      val default = {
+        if (valueS == null) {
+          sparkConf.getLong(key, rootConfig.getLong(key))
+        } else {
+          valueS.toLong
+        }
+      }
+      sparkConf.getLong(s"spark.$key", default)
+    } else {
+      if (valueS == null) rootConfig.getLong(key) else valueS.toLong
+    }
+  }
+
+  private def getString(sparkConf: SparkConf, parameters: Map[String, String],
+      key: String) : String = {
+    val defaultInConfig = if (rootConfig.hasPath(key)) rootConfig.getString(key) else null
+    val valueS = parameters.getOrElse(key, null)
+    if (sparkConf != null) {
+      val default = {
+        if (valueS == null) {
+          sparkConf.get(key, defaultInConfig)
+        } else {
+          valueS
+        }
+      }
+      sparkConf.get(s"spark.$key", default)
+    } else {
+      if (valueS == null) defaultInConfig else valueS
+    }
+  }
+
+  private def getBool(sparkConf: SparkConf, parameters: Map[String, String],
+      key: String) : Boolean = {
+    val valueS = parameters.getOrElse(key, null)
+    if (sparkConf != null) {
+      val default = {
+        if (valueS == null) {
+          sparkConf.getBoolean(key, rootConfig.getBoolean(key))
+        } else {
+          valueS.toBoolean
+        }
+      }
+      sparkConf.getBoolean(s"spark.$key", default)
+    } else
+    if (valueS == null) {
+      rootConfig.getBoolean(key)
+    } else {
+      valueS.toBoolean
+    }
+  }
+
+
+
+  def getConfig(context: SQLContext, parameters: Map[String, String]): CloudantConfig = {
+
+    val sparkConf = context.sparkContext.getConf
+
+    implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
+    implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
+    implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
+    implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
+    implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
+    implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
+    implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE)
+
+    val dbName = parameters.getOrElse("database", parameters.getOrElse("path", null))
+    val indexName = parameters.getOrElse("index", null)
+    val viewName = parameters.getOrElse("view", null)
+
+    // FIXME: Add logger
+    // scalastyle:off println
+    println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " +
+        s"indexName=$indexName, viewName=$viewName," +
+        s"$PARTITION_CONFIG=$total, $MAX_IN_PARTITION_CONFIG=$max," +
+        s"$MIN_IN_PARTITION_CONFIG=$min, $REQUEST_TIMEOUT_CONFIG=$requestTimeout," +
+        s"$BULK_SIZE_CONFIG=$bulkSize, $SCHEMA_SAMPLE_SIZE_CONFIG=$schemaSampleSize")
+    // scalastyle:on println
+
+    val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG)
+    val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG)
+    val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG)
+    val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG)
+    val selector = getString(sparkConf, parameters, "selector")
+
+    if (host != null) {
+      new CloudantConfig(protocol, host, dbName, indexName,
+        viewName) (user, passwd, total, max, min, requestTimeout, bulkSize,
+        schemaSampleSize, createDBOnSave, selector)
+    } else {
+      throw new RuntimeException("Spark configuration is invalid! " +
+        "Please make sure to supply required values for cloudant.host.")
+      }
+  }
+
+  def getConfig(sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = {
+
+    implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
+    implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
+    implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
+    implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
+    implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
+    implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
+    implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE)
+
+    val dbName = parameters.getOrElse("database", null)
+
+    // scalastyle:off println
+    println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " +
+      s"$REQUEST_TIMEOUT_CONFIG=$requestTimeout")
+    // scalastyle:on println
+
+    val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG)
+    val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG)
+    val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG)
+    val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG)
+    val selector = getString(sparkConf, parameters, "selector")
+
+    if (host != null) {
+      new CloudantConfig(protocol, host, dbName)(user, passwd,
+        total, max, min, requestTimeout, bulkSize,
+        schemaSampleSize, createDBOnSave, selector)
+    } else {
+      throw new RuntimeException("Cloudant parameters are invalid!" +
+          "Please make sure to supply required values for cloudant.host.")
+    }
+  }
+}