You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:21 UTC

[35/50] [abbrv] incubator-livy-website git commit: [BAHIR-102] Initial support of Cloudant Query and examples

[BAHIR-102] Initial support of Cloudant Query and examples

Add optimization to use query in particular scenarios.

Closes #41.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/commit/fd4c35fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/tree/fd4c35fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/diff/fd4c35fc

Branch: refs/heads/master
Commit: fd4c35fc9f7ebb57464d231cf5d66e7bc4096a1b
Parents: abfdc70
Author: Yang Lei <ge...@gmail.com>
Authored: Fri Apr 7 19:23:43 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Apr 13 12:15:10 2017 -0700

----------------------------------------------------------------------
 sql-cloudant/README.md                          |   2 +
 sql-cloudant/examples/python/CloudantQuery.py   |  65 ++++++++++
 sql-cloudant/examples/python/CloudantQueryDF.py |  61 +++++++++
 .../src/main/resources/application.conf         |   2 +
 .../apache/bahir/cloudant/CloudantConfig.scala  |  94 ++++++--------
 .../apache/bahir/cloudant/DefaultSource.scala   |  32 +----
 .../common/JsonStoreConfigManager.scala         |  62 ++-------
 .../cloudant/common/JsonStoreDataAccess.scala   |  79 ++++++------
 .../bahir/cloudant/common/JsonStoreRDD.scala    | 129 ++++++++++++++++---
 9 files changed, 338 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index eaa8893..38d2bbb 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -62,6 +62,8 @@ cloudant.protocol|https|protocol to use to transfer data: http or https
 cloudant.host||cloudant host url
 cloudant.username||cloudant userid
 cloudant.password||cloudant password
+cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. 
+cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint.
 jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
 jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
 jsonstore.rdd.minInPartition|10|the min rows in a partition.

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/examples/python/CloudantQuery.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantQuery.py b/sql-cloudant/examples/python/CloudantQuery.py
new file mode 100644
index 0000000..5ca5c44
--- /dev/null
+++ b/sql-cloudant/examples/python/CloudantQuery.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+from pyspark.sql import SparkSession
+
+# define cloudant related configuration
+# set protocol to http if needed, default value=https
+# config("cloudant.protocol","http")
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using query")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .config("cloudant.useQuery", "true")\
+    .config("schemaSampleSize",1)\
+    .getOrCreate()
+
+
+spark.sql(" CREATE TEMPORARY VIEW airportTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable1 WHERE airportName == 'Moscow' ")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+airportData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW airportTable2 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable2 WHERE airportName > 'Moscow' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+airportData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW airportTable3 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable3 WHERE airportName > 'Moscow' AND  airportName < 'Sydney'  ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+airportData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW flight1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight')")
+flightData = spark.sql("SELECT flightSegmentId, economyClassBaseCost, numFirstClassSeats FROM flight1 WHERE economyClassBaseCost >=200 AND numFirstClassSeats<=10")
+flightData.printSchema()
+print 'Total # of rows in airportData: ' + str(flightData.count())
+flightData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW flight2 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight')")
+flightData = spark.sql("SELECT flightSegmentId, scheduledDepartureTime, scheduledArrivalTime FROM flight2 WHERE scheduledDepartureTime >='2014-12-15T05:00:00.000Z' AND scheduledArrivalTime <='2014-12-15T11:04:00.000Z'")
+flightData.printSchema()
+print 'Total # of rows in airportData: ' + str(flightData.count())
+flightData.show()
+
+

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/examples/python/CloudantQueryDF.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantQueryDF.py b/sql-cloudant/examples/python/CloudantQueryDF.py
new file mode 100644
index 0000000..c8fa296
--- /dev/null
+++ b/sql-cloudant/examples/python/CloudantQueryDF.py
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+from pyspark.sql import SparkSession
+
+# define cloudant related configuration
+# set protocol to http if needed, default value=https
+# config("cloudant.protocol","http")
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using query")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .config("cloudant.useQuery", "true")\
+    .config("schemaSampleSize",1)\
+    .getOrCreate()
+
+
+# ***0. Loading dataframe from Cloudant db with one String field condition
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.airportName == 'Moscow').select("_id",'airportName').show()
+
+
+# ***1. Loading dataframe from Cloudant db with one String field condition
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.airportName > 'Moscow').select("_id",'airportName').show()
+
+# ***2. Loading dataframe from Cloudant db with two String field condition
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.airportName > 'Moscow').filter(df.airportName < 'Sydney').select("_id",'airportName').show()
+
+# ***3. Loading dataframe from Cloudant db with two int field condition
+df = spark.read.load("n_flight", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.economyClassBaseCost >= 200).filter(df.numFirstClassSeats <=10).select('flightSegmentId','scheduledDepartureTime', 'scheduledArrivalTime').show()
+
+# ***4. Loading dataframe from Cloudant db with two timestamp field condition
+df = spark.read.load("n_flight", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.scheduledDepartureTime >= "2014-12-15T05:00:00.000Z").filter(df.scheduledArrivalTime <="2014-12-15T11:04:00.000Z").select('flightSegmentId','scheduledDepartureTime', 'scheduledArrivalTime').show()
+
+

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf
index 2d8b236..80dea91 100644
--- a/sql-cloudant/src/main/resources/application.conf
+++ b/sql-cloudant/src/main/resources/application.conf
@@ -10,5 +10,7 @@ spark-sql {
     }
     cloudant = {
         protocol = https
+        useQuery = false
+        queryLimit = 25
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
index ac14f4b..c4e27b9 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
@@ -34,20 +34,16 @@ class CloudantConfig(val protocol: String, val host: String,
     (implicit val username: String, val password: String,
     val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
     val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
-    val createDBOnSave: Boolean, val selector: String)
+    val createDBOnSave: Boolean, val selector: String, val useQuery: Boolean = false,
+    val queryLimit: Int)
     extends Serializable{
 
-  private val SCHEMA_FOR_ALL_DOCS_NUM = -1
   private lazy val dbUrl = {protocol + "://" + host + "/" + dbName}
 
   val pkField = "_id"
   val defaultIndex = "_all_docs" // "_changes" does not work for partition
   val default_filter: String = "*:*"
 
-  def getChangesUrl(): String = {
-    dbUrl + "/_changes?include_docs=true&feed=normal"
-  }
-
   def getContinuousChangesUrl(): String = {
     var url = dbUrl + "/_changes?include_docs=true&feed=continuous&heartbeat=3000"
     if (selector != null) {
@@ -64,11 +60,6 @@ class CloudantConfig(val protocol: String, val host: String,
     dbUrl
   }
 
-  def getLastUrl(skip: Int): String = {
-    if (skip ==0 ) null
-    else s"$dbUrl/$defaultIndex?limit=$skip"
-  }
-
   def getSchemaSampleSize(): Int = {
     schemaSampleSize
   }
@@ -77,8 +68,6 @@ class CloudantConfig(val protocol: String, val host: String,
     createDBOnSave
   }
 
-  def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
-
   def getTotalUrl(url: String): String = {
     if (url.contains('?')) {
       url + "&limit=1"
@@ -91,37 +80,24 @@ class CloudantConfig(val protocol: String, val host: String,
     dbName
   }
 
-  def allowPartition(): Boolean = {indexName==null}
+  def queryEnabled(): Boolean = {useQuery && indexName==null && viewName==null}
 
-  def getOneUrl(): String = {
-    dbUrl + "/_all_docs?limit=1&include_docs=true"
-  }
+  def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && !queryUsed}
 
-  def getOneUrlExcludeDDoc1(): String = {
-    dbUrl + "/_all_docs?endkey=%22_design/%22&limit=1&include_docs=true"
-  }
+  def getAllDocsUrl(limit: Int, excludeDDoc: Boolean = false): String = {
 
-  def getOneUrlExcludeDDoc2(): String = {
-    dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=1&include_docs=true"
-  }
-
-  def getAllDocsUrlExcludeDDoc(limit: Int): String = {
     if (viewName == null) {
-      dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=" + limit + "&include_docs=true"
-    } else {
-      dbUrl + "/" + viewName + "?limit=1"
-    }
-  }
-
-  def getAllDocsUrl(limit: Int): String = {
-    if (viewName == null) {
-      if (limit == SCHEMA_FOR_ALL_DOCS_NUM) {
-        dbUrl + "/_all_docs?include_docs=true"
+      val baseUrl = (
+          if ( excludeDDoc) dbUrl + "/_all_docs?startkey=%22_design0/%22&include_docs=true"
+          else dbUrl + "/_all_docs?include_docs=true"
+          )
+      if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) {
+        baseUrl
       } else {
-        dbUrl + "/_all_docs?limit=" + limit + "&include_docs=true"
+        baseUrl + "&limit=" + limit
       }
     } else {
-      if (limit == JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM) {
+      if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) {
         dbUrl + "/" + viewName
       } else {
         dbUrl + "/" + viewName + "?limit=" + limit
@@ -132,22 +108,23 @@ class CloudantConfig(val protocol: String, val host: String,
   def getRangeUrl(field: String = null, start: Any = null,
       startInclusive: Boolean = false, end: Any = null,
       endInclusive: Boolean = false,
-      includeDoc: Boolean = true): (String, Boolean) = {
-    val (url: String, pusheddown: Boolean) =
-      calculate(field, start, startInclusive, end, endInclusive)
-    if (includeDoc) {
+      includeDoc: Boolean = true,
+      allowQuery: Boolean = false): (String, Boolean, Boolean) = {
+    val (url: String, pusheddown: Boolean, queryUsed: Boolean) =
+      calculate(field, start, startInclusive, end, endInclusive, allowQuery)
+    if (includeDoc && !queryUsed ) {
       if (url.indexOf('?') > 0) {
-        (url + "&include_docs=true", pusheddown)
+        (url + "&include_docs=true", pusheddown, queryUsed)
       } else {
-        (url + "?include_docs=true", pusheddown)
+        (url + "?include_docs=true", pusheddown, queryUsed)
       }
     } else {
-      (url, pusheddown)
+      (url, pusheddown, queryUsed)
     }
   }
 
   private def calculate(field: String, start: Any, startInclusive: Boolean,
-      end: Any, endInclusive: Boolean): (String, Boolean) = {
+      end: Any, endInclusive: Boolean, allowQuery: Boolean): (String, Boolean, Boolean) = {
     if (field != null && field.equals(pkField)) {
       var condition = ""
       if (start != null && end != null && start.equals(end)) {
@@ -166,16 +143,18 @@ class CloudantConfig(val protocol: String, val host: String,
           condition += "endkey=%22" + URLEncoder.encode(end.toString(), "UTF-8") + "%22"
         }
       }
-      (dbUrl + "/_all_docs" + condition, true)
+      (dbUrl + "/_all_docs" + condition, true, false)
     } else if (indexName!=null) {
       //  push down to indexName
       val condition = calculateCondition(field, start, startInclusive,
         end, endInclusive)
-      (dbUrl + "/" + indexName + "?q=" + condition, true)
+      (dbUrl + "/" + indexName + "?q=" + condition, true, false)
     } else if (viewName != null) {
-      (dbUrl + "/" + viewName, true)
+      (dbUrl + "/" + viewName, false, false)
+    } else if (allowQuery && useQuery) {
+      (s"$dbUrl/_find", false, true)
     } else {
-      (s"$dbUrl/$defaultIndex", false)
+      (s"$dbUrl/$defaultIndex", false, false)
     }
 
   }
@@ -215,20 +194,21 @@ class CloudantConfig(val protocol: String, val host: String,
     }
   }
 
-  def getSubSetUrl (url: String, skip: Int, limit: Int)
-      (implicit convertSkip: (Int) => String): String = {
+  def getSubSetUrl (url: String, skip: Int, limit: Int, queryUsed: Boolean): String = {
     val suffix = {
       if (url.indexOf("_all_docs")>0) "include_docs=true&limit=" +
         limit + "&skip=" + skip
-      else if (url.indexOf("_changes")>0) "include_docs=true&limit=" +
-          limit + "&since=" + convertSkip(skip)
       else if (viewName != null) {
         "limit=" + limit + "&skip=" + skip
+      } else if (queryUsed) {
+        ""
       } else {
         "include_docs=true&limit=" + limit
       } // TODO Index query does not support subset query. Should disable Partitioned loading?
     }
-    if (url.indexOf('?') > 0) {
+    if (suffix.length==0) {
+      url
+    } else if (url.indexOf('?') > 0) {
       url + "&" + suffix
     }
     else {
@@ -246,8 +226,10 @@ class CloudantConfig(val protocol: String, val host: String,
     }
   }
 
-  def getRows(result: JsValue): Seq[JsValue] = {
-    if (viewName == null) {
+  def getRows(result: JsValue, queryUsed: Boolean): Seq[JsValue] = {
+    if ( queryUsed ) {
+      ((result \ "docs").as[JsArray]).value.map(row => row)
+    } else if ( viewName == null) {
       ((result \ "rows").as[JsArray]).value.map(row => (row \ "doc").get)
     } else {
       ((result \ "rows").as[JsArray]).value.map(row => row)

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 4c973f7..deab22a 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
-import org.apache.bahir.cloudant.common.{FilterInterpreter, JsonStoreDataAccess, JsonStoreRDD, _}
+import org.apache.bahir.cloudant.common.{JsonStoreDataAccess, JsonStoreRDD, _}
 
 case class CloudantReadWriteRelation (config: CloudantConfig,
                                       schema: StructType,
@@ -49,23 +49,11 @@ case class CloudantReadWriteRelation (config: CloudantConfig,
           allDocsDF.select(requiredColumns(0), colsExceptCol0: _*).rdd
         }
       } else {
-        val filterInterpreter = new FilterInterpreter(filters)
-        var searchField: String = {
-          if (filterInterpreter.containsFiltersFor(config.pkField)) {
-            config.pkField
-          } else {
-            filterInterpreter.firstField
-          }
-        }
-
-        val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField)
-        implicit val columns = requiredColumns
-        val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField,
-            min, minInclusive, max, maxInclusive, false)
-        if (!pusheddown) searchField = null
-        implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField)
+        implicit val columns : Array[String] = requiredColumns
+        implicit val origFilters : Array[Filter] = filters
 
-        val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url)
+        logger.info("buildScan:" + columns + "," + origFilters)
+        val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
         val df = sqlContext.read.json(cloudantRDD)
         if (colsLength > 1) {
           val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
@@ -117,16 +105,10 @@ class DefaultSource extends RelationProvider
           inSchema
         } else {
           val df = if (config.getSchemaSampleSize() ==
-            JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM &&
+            JsonStoreConfigManager.ALL_DOCS_LIMIT &&
             config.viewName == null
             && config.indexName == null) {
-            val filterInterpreter = new FilterInterpreter(null)
-            var searchField = null
-            val (min, minInclusive, max, maxInclusive) =
-                filterInterpreter.getInfo(searchField)
-            val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField,
-                min, minInclusive, max, maxInclusive, false)
-            val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url)
+            val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
             allDocsDF = sqlContext.read.json(cloudantRDD)
             allDocsDF
           } else {

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
index 92192bb..38c5006 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
@@ -25,13 +25,14 @@ import org.apache.bahir.cloudant.CloudantConfig
 
  object JsonStoreConfigManager {
   val CLOUDANT_CONNECTOR_VERSION = "2.0.0"
-  val SCHEMA_FOR_ALL_DOCS_NUM = -1
+  val ALL_DOCS_LIMIT = -1
 
   private val CLOUDANT_HOST_CONFIG = "cloudant.host"
   private val CLOUDANT_USERNAME_CONFIG = "cloudant.username"
   private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password"
   private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol"
-
+  private val USE_QUERY_CONFIG = "cloudant.useQuery"
+  private val QUERY_LIMIT_CONFIG = "cloudant.queryLimit"
 
   private val PARTITION_CONFIG = "jsonstore.rdd.partitions"
   private val MAX_IN_PARTITION_CONFIG = "jsonstore.rdd.maxInPartition"
@@ -39,7 +40,7 @@ import org.apache.bahir.cloudant.CloudantConfig
   private val REQUEST_TIMEOUT_CONFIG = "jsonstore.rdd.requestTimeout"
   private val BULK_SIZE_CONFIG = "bulkSize"
   private val SCHEMA_SAMPLE_SIZE_CONFIG = "schemaSampleSize"
-  private val CREATE_DB_ON_SAVE = "createDBOnSave"
+  private val CREATE_DB_ON_SAVE_CONFIG = "createDBOnSave"
 
 
   private val configFactory = ConfigFactory.load()
@@ -139,6 +140,10 @@ import org.apache.bahir.cloudant.CloudantConfig
   def getConfig(context: SQLContext, parameters: Map[String, String]): CloudantConfig = {
 
     val sparkConf = context.sparkContext.getConf
+    getConfig(sparkConf, parameters)
+  }
+
+  def getConfig (sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = {
 
     implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
     implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
@@ -146,67 +151,28 @@ import org.apache.bahir.cloudant.CloudantConfig
     implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
     implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
     implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
-    implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE)
+    implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG)
+
+    implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
+    implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
 
     val dbName = parameters.getOrElse("database", parameters.getOrElse("path", null))
     val indexName = parameters.getOrElse("index", null)
     val viewName = parameters.getOrElse("view", null)
-
-    // FIXME: Add logger
-    // scalastyle:off println
-    println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " +
-        s"indexName=$indexName, viewName=$viewName," +
-        s"$PARTITION_CONFIG=$total, $MAX_IN_PARTITION_CONFIG=$max," +
-        s"$MIN_IN_PARTITION_CONFIG=$min, $REQUEST_TIMEOUT_CONFIG=$requestTimeout," +
-        s"$BULK_SIZE_CONFIG=$bulkSize, $SCHEMA_SAMPLE_SIZE_CONFIG=$schemaSampleSize")
-    // scalastyle:on println
+    val selector = parameters.getOrElse("selector", null)
 
     val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG)
     val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG)
     val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG)
     val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG)
-    val selector = getString(sparkConf, parameters, "selector")
 
     if (host != null) {
       new CloudantConfig(protocol, host, dbName, indexName,
         viewName) (user, passwd, total, max, min, requestTimeout, bulkSize,
-        schemaSampleSize, createDBOnSave, selector)
+        schemaSampleSize, createDBOnSave, selector, useQuery, queryLimit)
     } else {
       throw new RuntimeException("Spark configuration is invalid! " +
         "Please make sure to supply required values for cloudant.host.")
       }
   }
-
-  def getConfig(sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = {
-
-    implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
-    implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
-    implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
-    implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
-    implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
-    implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
-    implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE)
-
-    val dbName = parameters.getOrElse("database", null)
-
-    // scalastyle:off println
-    println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " +
-      s"$REQUEST_TIMEOUT_CONFIG=$requestTimeout")
-    // scalastyle:on println
-
-    val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG)
-    val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG)
-    val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG)
-    val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG)
-    val selector = getString(sparkConf, parameters, "selector")
-
-    if (host != null) {
-      new CloudantConfig(protocol, host, dbName)(user, passwd,
-        total, max, min, requestTimeout, bulkSize,
-        schemaSampleSize, createDBOnSave, selector)
-    } else {
-      throw new RuntimeException("Cloudant parameters are invalid!" +
-          "Please make sure to supply required values for cloudant.host.")
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
index e84a44c..ac79359 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
@@ -39,19 +39,6 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
   lazy val logger = LoggerFactory.getLogger(getClass)
   implicit lazy val timeout = config.requestTimeout
 
-  def getOne()( implicit columns: Array[String] = null): Seq[String] = {
-    var r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc1(), processAll)
-    if (r.size == 0 ) {
-      r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc2(), processAll)
-    }
-    if (r.size == 0) {
-      throw new RuntimeException("Database " + config.getDbname() +
-        " doesn't have any non-design documents!")
-    } else {
-      r
-    }
-  }
-
   def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = {
     if (limit == 0) {
       throw new RuntimeException("Database " + config.getDbname() +
@@ -63,7 +50,7 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
     }
     var r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit), processAll)
     if (r.size == 0) {
-      r = this.getQueryResult[Seq[String]](config.getAllDocsUrlExcludeDDoc(limit), processAll)
+      r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit, true), processAll)
     }
     if (r.size == 0) {
       throw new RuntimeException("Database " + config.getDbname() +
@@ -74,40 +61,34 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
   }
 
   def getAll[T](url: String)
-      (implicit columns: Array[String] = null,
-      attrToFilters: Map[String, Array[Filter]] = null): Seq[String] = {
+      (implicit columns: Array[String] = null): Seq[String] = {
     this.getQueryResult[Seq[String]](url, processAll)
   }
 
   def getIterator(skip: Int, limit: Int, url: String)
       (implicit columns: Array[String] = null,
-      attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = {
-    implicit def convertSkip(skip: Int): String = {
-      val url = config.getLastUrl(skip)
-      if (url == null) {
-        skip.toString()
-      } else {
-        this.getQueryResult[String](url,
-          { result => config.getLastNum(Json.parse(result)).as[JsString].value})
-      }
-    }
-    val newUrl = config.getSubSetUrl(url, skip, limit)
+      postData: String = null): Iterator[String] = {
+    val newUrl = config.getSubSetUrl(url, skip, limit, postData!=null)
     this.getQueryResult[Iterator[String]](newUrl, processIterator)
   }
 
-  def getTotalRows(url: String): Int = {
-    val totalUrl = config.getTotalUrl(url)
-    this.getQueryResult[Int](totalUrl,
-        { result => config.getTotalRows(Json.parse(result))})
+  def getTotalRows(url: String, queryUsed: Boolean)
+      (implicit postData: String = null): Int = {
+      if (queryUsed) config.queryLimit // Query can not retrieve total row now.
+      else {
+        val totalUrl = config.getTotalUrl(url)
+        this.getQueryResult[Int](totalUrl,
+          { result => config.getTotalRows(Json.parse(result))})
+      }
   }
 
   private def processAll (result: String)
       (implicit columns: Array[String],
-      attrToFilters: Map[String, Array[Filter]] = null) = {
-    logger.debug(s"processAll columns:$columns, attrToFilters:$attrToFilters")
+      postData: String = null) = {
+    logger.debug(s"processAll:$result, columns:$columns")
     val jsonResult: JsValue = Json.parse(result)
-    var rows = config.getRows(jsonResult)
-    if (config.viewName == null) {
+    var rows = config.getRows(jsonResult, postData!=null )
+    if (config.viewName == null && postData==null) {
       // filter design docs
       rows = rows.filter(r => FilterDDocs.filter(r))
     }
@@ -116,7 +97,7 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
 
   private def processIterator (result: String)
     (implicit columns: Array[String],
-    attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = {
+      postData: String = null): Iterator[String] = {
     processAll(result).iterator
   }
 
@@ -137,23 +118,39 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
     getQueryResult(url, processResults)
   }
 
-
   private def getQueryResult[T]
       (url: String, postProcessor: (String) => T)
       (implicit columns: Array[String] = null,
-      attrToFilters: Map[String, Array[Filter]] = null) : T = {
-    logger.warn("Loading data from Cloudant using query: " + url)
+      postData: String = null) : T = {
+    logger.info(s"Loading data from Cloudant using: $url , postData: $postData")
     val requestTimeout = config.requestTimeout.toInt
     val clRequest: HttpRequest = config.username match {
       case null =>
-        Http(url)
+        if (postData!=null) {
+          Http(url)
+          .postData(postData)
+          .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+          .header("Content-Type", "application/json")
+          .header("User-Agent", "spark-cloudant")
+        } else {
+          Http(url)
             .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
             .header("User-Agent", "spark-cloudant")
+        }
       case _ =>
-        Http(url)
+        if (postData!=null) {
+          Http(url)
+          .postData(postData)
+          .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+          .header("Content-Type", "application/json")
+          .header("User-Agent", "spark-cloudant")
+          .auth(config.username, config.password)
+        } else {
+          Http(url)
             .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
             .header("User-Agent", "spark-cloudant")
             .auth(config.username, config.password)
+        }
     }
 
     val clResponse: HttpResponse[String] = clRequest.execute()

http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
index 46774f5..46ba912 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
@@ -17,12 +17,13 @@
 package org.apache.bahir.cloudant.common
 
 import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsNull, Json, JsString, JsValue}
 
 import org.apache.spark.Partition
 import org.apache.spark.SparkContext
 import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.sources._
 
 import org.apache.bahir.cloudant.CloudantConfig
 
@@ -31,9 +32,9 @@ import org.apache.bahir.cloudant.CloudantConfig
   * the limit rows returns and the skipped rows.
  */
 
-private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int,
-    val idx: Int, val config: CloudantConfig,
-    val attrToFilters: Map[String, Array[Filter]])
+private[cloudant] class JsonStoreRDDPartition(val url: String, val skip: Int, val limit: Int,
+    val idx: Int, val config: CloudantConfig, val selector: JsValue, val fields: JsValue,
+    val queryUsed: Boolean)
     extends Partition with Serializable{
   val index = idx
 }
@@ -46,16 +47,15 @@ private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int,
  *  and minInPartition / maxInPartition )
  *  maxRowsInPartition: -1 means unlimited
  */
-class JsonStoreRDD(sc: SparkContext, config: CloudantConfig,
-    url: String)(implicit requiredcolumns: Array[String] = null,
-    attrToFilters: Map[String, Array[Filter]] = null)
+class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
+    (implicit requiredcolumns: Array[String] = null,
+              filters: Array[Filter] = null)
   extends RDD[String](sc, Nil) {
 
-  lazy val totalRows = {
-      new JsonStoreDataAccess(config).getTotalRows(url)
-  }
-  lazy val totalPartition = {
-    if (totalRows == 0 || ! config.allowPartition() )  1
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private def getTotalPartition(totalRows: Int, queryUsed: Boolean): Int = {
+    if (totalRows == 0 || ! config.allowPartition(queryUsed) )  1
     else if (totalRows < config.partitions * config.minInPartition) {
       val total = totalRows / config.minInPartition
       if (total == 0 ) {
@@ -76,7 +76,7 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig,
     }
   }
 
-  lazy val limitPerPartition = {
+  private def getLimitPerPartition(totalRows: Int, totalPartition: Int): Int = {
     val limit = totalRows/totalPartition
     if (totalRows % totalPartition != 0) {
       limit + 1
@@ -85,22 +85,115 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig,
     }
   }
 
+  private def convertToMangoJson(f: Filter): (String, JsValue) = {
+    val (op, value): (String, Any) = f match {
+      case EqualTo(attr, v) => ("$eq", v)
+      case GreaterThan(attr, v) => ("$gt", v)
+      case LessThan(attr, v) => ("$lt", v)
+      case GreaterThanOrEqual(attr, v) => ("$gte", v)
+      case LessThanOrEqual(attr, v) => ("$lte", v)
+      case _ => (null, null)
+    }
+    val convertedV: JsValue = {
+      // TODO Better handing of other types
+      if (value != null) {
+        value match {
+          case s: String => Json.toJson(s)
+          case l: Long => Json.toJson(l)
+          case d: Double => Json.toJson(d)
+          case i: Int => Json.toJson(i)
+          case b: Boolean => Json.toJson(b)
+          case t: java.sql.Timestamp => Json.toJson(t)
+          case a: Any => logger.debug(s"Ignore field:$name, cannot handle its datatype: $a"); null
+        }
+      } else null
+    }
+    (op, convertedV)
+  }
+
+  private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsValue] = {
+    filters.map(af => convertToMangoJson(af))
+            .filter(x => x._2 != null)
+            .toMap
+  }
+
   override def getPartitions: Array[Partition] = {
-    val logger = LoggerFactory.getLogger(getClass)
+
+    logger.info("getPartitions:" + requiredcolumns + "," + filters)
+
+    val filterInterpreter = new FilterInterpreter(filters)
+    val origAttrToFilters = ( if (filters==null || filters.length==0) null
+                              else filterInterpreter.getFiltersForPostProcess(null))
+
+    val (selector, fields) : (JsValue, JsValue) = {
+      if (!config.queryEnabled() || origAttrToFilters == null) (null, null)
+      else {
+        val selectors: Map[String, Map[String, JsValue]] =
+          origAttrToFilters.transform( (name, attrFilters) => convertAttrToMangoJson(attrFilters))
+        val filteredSelectors = selectors.filter((t) => ! t._2.isEmpty)
+
+        if (! filteredSelectors.isEmpty) {
+          val queryColumns = (
+              if (requiredcolumns == null || requiredcolumns.size == 0) null
+              else Json.toJson(requiredcolumns))
+          (Json.toJson(filteredSelectors), queryColumns)
+        } else (null, null)
+      }
+    }
+
+    logger.info("calculated selector and fields:" + selector + "," + fields)
+
+    var searchField: String = {
+          if (origAttrToFilters ==null ) null
+          else if (filterInterpreter.containsFiltersFor(config.pkField)) {
+            config.pkField
+          } else {
+            filterInterpreter.firstField
+          }
+        }
+
+    val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField)
+    val (url: String, pusheddown: Boolean, queryUsed: Boolean) = config.getRangeUrl(searchField,
+            min, minInclusive, max, maxInclusive, false, selector!=null)
+
+    implicit val postData : String = {
+      if (queryUsed) {
+        Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
+      } else {
+        null
+      }
+    }
+    val totalRows = new JsonStoreDataAccess(config).getTotalRows(url, queryUsed)
+    val totalPartition = getTotalPartition(totalRows, queryUsed)
+    val limitPerPartition = getLimitPerPartition(totalRows, totalPartition)
+
     logger.info(s"Partition config - total=$totalPartition, " +
         s"limit=$limitPerPartition for totalRows of $totalRows")
 
-    (0 until totalPartition).map(i => {
+   logger.info(s"Partition query info - url=$url, queryUsed=$queryUsed")
+
+   (0 until totalPartition).map(i => {
       val skip = i * limitPerPartition
-      new JsonStoreRDDPartition(skip, limitPerPartition, i, config,
-          attrToFilters).asInstanceOf[Partition]
+      new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
+          config, selector, fields, queryUsed).asInstanceOf[Partition]
     }).toArray
   }
 
   override def compute(splitIn: Partition, context: TaskContext):
       Iterator[String] = {
     val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
+    implicit val postData : String = {
+      if (myPartition.queryUsed && myPartition.fields !=null) {
+        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
+            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
+      } else if (myPartition.queryUsed) {
+        Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
+            "skip" -> myPartition.skip))
+      } else {
+        null
+      }
+    }
     new JsonStoreDataAccess(myPartition.config).getIterator(myPartition.skip,
-        myPartition.limit, url)
+        myPartition.limit, myPartition.url)
   }
 }