You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/01/26 15:43:53 UTC

bahir git commit: [BAHIR-154] Refactor sql-cloudant to use cloudant-client library

Repository: bahir
Updated Branches:
  refs/heads/master 785ee1e1a -> 6ea42a896


[BAHIR-154] Refactor sql-cloudant to use cloudant-client library

- Use java-cloudant’s executeRequest for HTTP requests against
  _all_docs endpoint
- Added HTTP 429 backoff with default settings
- Simplified caught exception and message for schema size
- Replaced scala http library with okhttp library for changes receiver
- Updated streaming CloudantReceiver class to use improved
  ChangesRowScanner method
- Replaced Play JSON with GSON library
- Updated save operation to use java-cloudant bulk API
- Use _changes feed filter option for Cloudant/CouchDB 2.x and greater

Closes #61


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/6ea42a89
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/6ea42a89
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/6ea42a89

Branch: refs/heads/master
Commit: 6ea42a8965d98773a342ad5cd31aab6b64e9d9bd
Parents: 785ee1e
Author: Esteban Laver <em...@us.ibm.com>
Authored: Sun Dec 17 13:30:04 2017 -0500
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Jan 26 07:43:07 2018 -0800

----------------------------------------------------------------------
 sql-cloudant/README.md                          |   1 +
 sql-cloudant/pom.xml                            |  21 +--
 .../src/main/resources/application.conf         |   1 +
 .../bahir/cloudant/CloudantChangesConfig.scala  |  15 +-
 .../apache/bahir/cloudant/CloudantConfig.scala  | 136 +++++++++------
 .../bahir/cloudant/CloudantReceiver.scala       |  77 ++++-----
 .../apache/bahir/cloudant/DefaultSource.scala   |  13 +-
 .../bahir/cloudant/common/FilterUtil.scala      |  15 +-
 .../common/JsonStoreConfigManager.scala         |  63 ++++---
 .../cloudant/common/JsonStoreDataAccess.scala   | 172 ++++++-------------
 .../bahir/cloudant/common/JsonStoreRDD.scala    |  55 +++---
 .../apache/bahir/cloudant/common/JsonUtil.scala |  33 ++--
 .../cloudant/internal/ChangesReceiver.scala     |  69 ++++----
 .../bahir/cloudant/CloudantOptionSuite.scala    |  33 +++-
 14 files changed, 358 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index 160e279..b651990 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -63,6 +63,7 @@ cloudant.protocol|https|protocol to use to transfer data: http or https
 cloudant.host| |cloudant host url
 cloudant.username| |cloudant userid
 cloudant.password| |cloudant password
+cloudant.numberOfRetries|3| number of times to replay a request that received a 429 `Too Many Requests` response
 cloudant.useQuery|false|by default, `_all_docs` endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, `_find` endpoint will be used in place of `_all_docs` when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. 
 cloudant.queryLimit|25|the maximum number of results returned when querying the `_find` endpoint.
 cloudant.storageLevel|MEMORY_ONLY|the storage level for persisting Spark RDDs during load when `cloudant.endpoint` is set to `_changes`.  See [RDD Persistence section](https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) in Spark's Progamming Guide for all available storage level options.

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/pom.xml
----------------------------------------------------------------------
diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml
index 55a5210..672418a 100644
--- a/sql-cloudant/pom.xml
+++ b/sql-cloudant/pom.xml
@@ -36,8 +36,14 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.typesafe.play</groupId>
-      <artifactId>play-json_${scala.binary.version}</artifactId>
+      <groupId>com.cloudant</groupId>
+      <artifactId>cloudant-client</artifactId>
+      <version>2.11.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.squareup.okhttp3</groupId>
+      <artifactId>okhttp</artifactId>
+      <version>3.9.0</version>
     </dependency>
     <dependency>
       <groupId>com.typesafe</groupId>
@@ -45,11 +51,6 @@
       <version>1.3.1</version>
     </dependency>
     <dependency>
-      <groupId>org.joda</groupId>
-      <artifactId>joda-convert</artifactId>
-      <version>1.8.1</version>
-    </dependency>
-    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
       <version>2.6.7</version>
@@ -106,12 +107,6 @@
       <artifactId>scalacheck_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>com.cloudant</groupId>
-      <artifactId>cloudant-client</artifactId>
-      <version>2.11.0</version>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
   <build>
 	 <resources>

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf
index 6ff4139..7f7beaf 100644
--- a/sql-cloudant/src/main/resources/application.conf
+++ b/sql-cloudant/src/main/resources/application.conf
@@ -11,6 +11,7 @@ spark-sql {
     cloudant = {
         batchInterval = 8
         endpoint = "_all_docs"
+        numberOfRetries = 3
         protocol = https
         useQuery = false
         queryLimit = 25

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
index 9f2a7ba..0615e16 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
@@ -27,10 +27,10 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
                             bulkSize: Int, schemaSampleSize: Int,
                             createDBOnSave: Boolean, endpoint: String, selector: String,
                             timeout: Int, storageLevel: StorageLevel, useQuery: Boolean,
-                            queryLimit: Int, batchInterval: Int)
+                            queryLimit: Int, batchInterval: Int, numberOfRetries: Int)
   extends CloudantConfig(protocol, host, dbName, indexName, viewName)(username, password,
     partitions, maxInPartition, minInPartition, requestTimeout, bulkSize, schemaSampleSize,
-    createDBOnSave, endpoint, useQuery, queryLimit) {
+    createDBOnSave, endpoint, useQuery, queryLimit, numberOfRetries) {
 
   override val defaultIndex: String = endpoint
 
@@ -42,9 +42,14 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
     if (selector != null && !selector.isEmpty) {
       selector
     } else {
-      // Exclude design docs and deleted=true docs
-      "{ \"_id\": { \"$regex\": \"^(?!_design/)\" }, " +
-        "\"_deleted\": { \"$exists\": false } }"
+      val version = getClient.serverVersion
+      if (version.matches("1.*")) {
+        null
+      } else {
+        // Exclude design docs and deleted=true docs
+        "{ \"_id\": { \"$regex\": \"^(?!_design/)\" }, " +
+          "\"_deleted\": { \"$exists\": false } }"
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
index 8affd4f..c7370f0 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
@@ -16,11 +16,21 @@
  */
 package org.apache.bahir.cloudant
 
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
 
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
 
 import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /*
 * Only allow one field pushdown now
@@ -28,22 +38,48 @@ import org.apache.bahir.cloudant.common._
 */
 
 class CloudantConfig(val protocol: String, val host: String,
-                     val dbName: String, val indexName: String, val viewName: String)
+                     val dbName: String, val indexPath: String, val viewPath: String)
                     (implicit val username: String, val password: String,
                      val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                      val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                      val createDBOnSave: Boolean, val endpoint: String,
-                     val useQuery: Boolean = false, val queryLimit: Int)
+                     val useQuery: Boolean = false, val queryLimit: Int,
+                     val numberOfRetries: Int)
   extends Serializable {
 
+  @transient private lazy val client: CloudantClient = ClientBuilder
+    .url(getClientUrl)
+    .username(username)
+    .password(password)
+    .interceptors(new Replay429Interceptor(numberOfRetries, 250L))
+    .build
+  @transient private lazy val database: Database = client.database(dbName, false)
   lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
 
   val pkField = "_id"
   val defaultIndex: String = endpoint
   val default_filter: String = "*:*"
 
-  def getDbUrl: String = {
-    dbUrl
+  def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
+    val url = new URL(stringUrl)
+    if(postData != null) {
+      val conn = Http.POST(url, "application/json")
+      conn.setRequestBody(postData)
+      conn.requestProperties.put("User-Agent", "spark-cloudant")
+      client.executeRequest(conn)
+    } else {
+      val conn = Http.GET(url)
+      conn.requestProperties.put("User-Agent", "spark-cloudant")
+      client.executeRequest(conn)
+    }
+  }
+
+  def getClient: CloudantClient = {
+    client
+  }
+
+  def getDatabase: Database = {
+    database
   }
 
   def getSchemaSampleSize: Int = {
@@ -54,20 +90,20 @@ class CloudantConfig(val protocol: String, val host: String,
     createDBOnSave
   }
 
-  def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
+  def getClientUrl: URL = {
+    new URL(protocol + "://" + host)
+  }
+
+  def getLastNum(result: JsonObject): JsonObject = result.get("last_seq").getAsJsonObject
 
   /* Url containing limit for docs in a Cloudant database.
   * If a view is not defined, use the _all_docs endpoint.
   * @return url with one doc limit for retrieving total doc count
   */
   def getUrl(limit: Int, excludeDDoc: Boolean = false): String = {
-    if (viewName == null) {
+    if (viewPath == null) {
       val baseUrl = {
-        if (excludeDDoc) {
-          dbUrl + "/_all_docs?startkey=%22_design0/%22&include_docs=true"
-        } else {
-          dbUrl + "/_all_docs?include_docs=true"
-        }
+        dbUrl + "/_all_docs?include_docs=true"
       }
       if (limit == JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
         baseUrl
@@ -76,16 +112,16 @@ class CloudantConfig(val protocol: String, val host: String,
       }
     } else {
       if (limit == JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
-        dbUrl + "/" + viewName
+        dbUrl + "/" + viewPath
       } else {
-        dbUrl + "/" + viewName + "?limit=" + limit
+        dbUrl + "/" + viewPath + "?limit=" + limit
       }
     }
   }
 
-  /* Url containing limit to count total docs in a Cloudant database.
+  /* Total count of documents in a Cloudant database.
   *
-  * @return url with one doc limit for retrieving total doc count
+  * @return total doc count number
   */
   def getTotalUrl(url: String): String = {
     if (url.contains('?')) {
@@ -100,10 +136,10 @@ class CloudantConfig(val protocol: String, val host: String,
   }
 
   def queryEnabled: Boolean = {
-    useQuery && indexName == null && viewName == null
+    useQuery && indexPath == null && viewPath == null
   }
 
-  def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && !queryUsed}
+  def allowPartition(queryUsed: Boolean): Boolean = {indexPath == null && !queryUsed}
 
   def getRangeUrl(field: String = null, start: Any = null,
                   startInclusive: Boolean = false, end: Any = null,
@@ -112,7 +148,7 @@ class CloudantConfig(val protocol: String, val host: String,
                   allowQuery: Boolean = false): (String, Boolean, Boolean) = {
     val (url: String, pusheddown: Boolean, queryUsed: Boolean) =
       calculate(field, start, startInclusive, end, endInclusive, allowQuery)
-    if (includeDoc && !queryUsed ) {
+    if (includeDoc && !queryUsed) {
       if (url.indexOf('?') > 0) {
         (url + "&include_docs=true", pusheddown, queryUsed)
       } else {
@@ -123,14 +159,12 @@ class CloudantConfig(val protocol: String, val host: String,
     }
   }
 
-  /*
-  * Url for paging using skip and limit options when loading docs with partitions.
-  */
+
   def getSubSetUrl(url: String, skip: Int, limit: Int, queryUsed: Boolean): String = {
     val suffix = {
       if (url.indexOf(JsonStoreConfigManager.ALL_DOCS_INDEX) > 0) {
         "include_docs=true&limit=" + limit + "&skip=" + skip
-      } else if (viewName != null) {
+      } else if (viewPath != null) {
         "limit=" + limit + "&skip=" + skip
       } else if (queryUsed) {
         ""
@@ -170,13 +204,13 @@ class CloudantConfig(val protocol: String, val host: String,
         }
       }
       (dbUrl + "/" + defaultIndex + condition, true, false)
-    } else if (indexName != null) {
+    } else if (indexPath != null) {
       //  push down to indexName
       val condition = calculateCondition(field, start, startInclusive,
         end, endInclusive)
-      (dbUrl + "/" + indexName + "?q=" + condition, true, false)
-    } else if (viewName != null) {
-      (dbUrl + "/" + viewName, false, false)
+      (dbUrl + "/" + indexPath + "?q=" + condition, true, false)
+    } else if (viewPath != null) {
+      (dbUrl + "/" + viewPath, false, false)
     } else if (allowQuery && useQuery) {
       (s"$dbUrl/_find", false, true)
     } else {
@@ -220,39 +254,41 @@ class CloudantConfig(val protocol: String, val host: String,
     }
   }
 
-  def getTotalRows(result: JsValue): Int = {
-    val resultKeys = result.as[JsObject].keys
-    if(resultKeys.contains("total_rows")) {
-      (result \ "total_rows").as[Int]
-    } else if (resultKeys.contains("pending")) {
-      (result \ "pending").as[Int] + 1
+  def getResultCount(result: String): Int = {
+    val jsonResult: JsonObject = new JsonParser().parse(result).getAsJsonObject
+    if (jsonResult.has("total_rows")) {
+      jsonResult.get("total_rows").getAsInt
+    } else if (jsonResult.has("pending")) {
+      jsonResult.get("pending").getAsInt + 1
     } else {
       1
     }
   }
 
-  def getRows(result: JsValue, queryUsed: Boolean): Seq[JsValue] = {
-    if ( queryUsed ) {
-      (result \ "docs").as[JsArray].value.map(row => row)
+  def getRows(result: String, queryUsed: Boolean): Seq[JsonObject] = {
+    val jsonResult: JsonObject = new JsonParser().parse(result).getAsJsonObject
+    if (queryUsed) {
+      if (jsonResult.has("docs")) {
+        jsonResult.get("docs").getAsJsonArray.asScala
+          .map(row => row.getAsJsonObject).toSeq
+      } else {
+        Seq()
+      }
     } else {
-      val containsResultsKey: Boolean = result.as[JsObject].keys.contains("results")
-      if (containsResultsKey) {
-        (result \ "results").as[JsArray].value.map(row => (row \ "doc").get)
-      } else if (viewName == null) {
-        (result \ "rows").as[JsArray].value.map(row => (row \ "doc").get)
+      if (jsonResult.has("results")) {
+        jsonResult.get("result").getAsJsonArray.asScala.map(row => row.getAsJsonObject
+          .get("doc").getAsJsonObject).toSeq
+      } else if (viewPath == null) {
+        jsonResult.get("rows").getAsJsonArray.asScala.map(row => row.getAsJsonObject
+          .get("doc").getAsJsonObject).toSeq
       } else {
-        (result \ "rows").as[JsArray].value.map(row => row)
+        jsonResult.get("rows").getAsJsonArray.asScala.map(row => row.getAsJsonObject).toSeq
       }
     }
   }
 
-  def getBulkPostUrl: String = {
-    dbUrl + "/_bulk_docs"
-  }
-
-  def getBulkRows(rows: List[String]): String = {
-    val docs = rows.map { x => Json.parse(x) }
-    Json.stringify(Json.obj("docs" -> Json.toJson(docs.toSeq)))
+  def getBulkRows(rows: List[String]): List[JsonObject] = {
+    rows.map { x => JsonConverter.toJson(x).getAsJsonObject }
   }
 
   def getConflictErrStr: String = {

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
index 60a7d4a..029cabc 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
@@ -16,8 +16,10 @@
  */
 package org.apache.bahir.cloudant
 
-import play.api.libs.json.Json
-import scalaj.http._
+import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
+
+import okhttp3._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.StorageLevel
@@ -42,50 +44,43 @@ class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String]
   }
 
   private def receive(): Unit = {
-    val url = config.getContinuousChangesUrl.toString
-    val selector: String = if (config.getSelector != null) {
-      "{\"selector\":" + config.getSelector + "}"
-    } else {
-      "{}"
-    }
+    val okHttpClient: OkHttpClient = new OkHttpClient.Builder()
+      .connectTimeout(5, TimeUnit.SECONDS)
+      .readTimeout(60, TimeUnit.SECONDS)
+      .build
+    val url = config.getChangesReceiverUrl.toString
 
-    val clRequest: HttpRequest = config.username match {
-      case null =>
-        Http(url)
-          .postData(selector)
-          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
-          .header("Content-Type", "application/json")
-          .header("User-Agent", "spark-cloudant")
-      case _ =>
-        Http(url)
-          .postData(selector)
-          .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
-          .header("Content-Type", "application/json")
-          .header("User-Agent", "spark-cloudant")
-          .auth(config.username, config.password)
+    val builder = new Request.Builder().url(url)
+    if (config.username != null) {
+      val credential = Credentials.basic(config.username, config.password)
+      builder.header("Authorization", credential)
+    }
+    if(config.getSelector != null) {
+      val jsonType = MediaType.parse("application/json; charset=utf-8")
+      val selector = "{\"selector\":" + config.getSelector + "}"
+      val selectorBody = RequestBody.create(jsonType, selector)
+      builder.post(selectorBody)
     }
 
-    clRequest.exec((code, headers, is) => {
-      if (code == 200) {
-        scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => {
-          if (line.length() > 0) {
-            val json = Json.parse(line)
-            val jsonDoc = (json \ "doc").getOrElse(null)
-            var doc = ""
-            if(jsonDoc != null) {
-              doc = Json.stringify(jsonDoc)
-              if(!isStopped() && doc.nonEmpty) {
-                store(doc)
-              }
-            }
+    val request = builder.build
+    val response = okHttpClient.newCall(request).execute
+    val status_code = response.code
+
+    if (status_code == 200) {
+      val changesInputStream = response.body.byteStream
+      var json = new ChangesRow()
+      if (changesInputStream != null) {
+        val bufferedReader = new BufferedReader(new InputStreamReader(changesInputStream))
+        while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) {
+          if (!isStopped() && json != null && !json.getDoc.has("_deleted")) {
+            store(json.getDoc.toString)
           }
-        })
-      } else {
-        val status = headers.getOrElse("Status", IndexedSeq.empty)
-        val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status(0)
-        reportError(errorMsg, new CloudantException(errorMsg))
+        }
       }
-    })
+    } else {
+      val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status_code
+      reportError(errorMsg, new CloudantException(errorMsg))
+    }
   }
 
   def onStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 2685993..47643cc 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -109,11 +109,11 @@ class DefaultSource extends RelationProvider
       if (inSchema != null) {
         inSchema
       } else if (!config.isInstanceOf[CloudantChangesConfig]
-        || config.viewName != null || config.indexName != null) {
+        || config.viewPath != null || config.indexPath != null) {
         val df = if (config.getSchemaSampleSize ==
           JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
-          config.viewName == null
-          && config.indexName == null) {
+          config.viewPath == null
+          && config.indexPath == null) {
           val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
           dataFrame = sqlContext.read.json(cloudantRDD.toDS())
           dataFrame
@@ -144,12 +144,7 @@ class DefaultSource extends RelationProvider
         // Collect and union each RDD to convert all RDDs to a DataFrame
         changes.foreachRDD((rdd: RDD[String]) => {
           if (!rdd.isEmpty()) {
-            if (globalRDD != null) {
-              // Union RDDs in foreach loop
-              globalRDD = globalRDD.union(rdd)
-            } else {
-              globalRDD = rdd
-            }
+            globalRDD = rdd ++ globalRDD
           } else {
             // Convert final global RDD[String] to DataFrame
             dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS())

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
index ef1f7da..79f681d 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
@@ -16,12 +16,11 @@
  */
 package org.apache.bahir.cloudant.common
 
+import com.google.gson.JsonObject
 import org.slf4j.LoggerFactory
-import play.api.libs.json.{JsObject, JsString, JsValue}
 
 import org.apache.spark.sql.sources._
 
-
 /**
  * Only handles the following filter condition
  * 1. EqualTo,GreaterThan,LessThan,GreaterThanOrEqual,LessThanOrEqual,In
@@ -118,11 +117,11 @@ class FilterInterpreter(origFilters: Array[Filter]) {
  */
 class FilterUtil(filters: Map[String, Array[Filter]]) {
   private val logger = LoggerFactory.getLogger(getClass)
-  def apply(implicit r: JsValue = null): Boolean = {
+  def apply(implicit r: JsonObject = null): Boolean = {
     if (r == null) return true
     val satisfied = filters.forall({
       case (attr, filters) =>
-        val field = JsonUtil.getField(r, attr).getOrElse(null)
+        val field = JsonUtil.getField(r, attr).orNull
         if (field == null) {
           logger.debug(s"field $attr not exisit:$r")
           false
@@ -136,12 +135,10 @@ class FilterUtil(filters: Map[String, Array[Filter]]) {
 
 
 object FilterDDocs {
-  def filter(row: JsValue): Boolean = {
+  def filter(row: JsonObject): Boolean = {
     if (row == null) return true
-    val id : String = if (row.as[JsObject].keys.contains("_id")) {
-      JsonUtil.getField(row, "_id").orNull.as[JsString].value
-    } else if (row.as[JsObject].keys.contains("id")) {
-      JsonUtil.getField(row, "id").orNull.as[JsString].value
+    val id : String = if (row.has("_id")) {
+      row.get("_id").getAsString
     } else {
       null
     }

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
index 9cd495d..0e66f03 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
@@ -40,6 +40,7 @@ object JsonStoreConfigManager {
   private val CLOUDANT_CHANGES_TIMEOUT = "cloudant.timeout"
   private val USE_QUERY_CONFIG = "cloudant.useQuery"
   private val QUERY_LIMIT_CONFIG = "cloudant.queryLimit"
+  private val NUMBER_OF_RETRIES = "cloudant.numberOfRetries"
   private val FILTER_SELECTOR = "selector"
 
   private val PARTITION_CONFIG = "jsonstore.rdd.partitions"
@@ -68,22 +69,27 @@ object JsonStoreConfigManager {
 
   private def getInt(sparkConf: SparkConf, parameters: Map[String, String],
                      key: String) : Int = {
-    val valueS = parameters.getOrElse(key, null)
-    if (sparkConf != null) {
-      val default = {
+    try {
+      val valueS = parameters.getOrElse(key, null)
+      if (sparkConf != null) {
+        val default = {
+          if (valueS == null) {
+            sparkConf.getInt(key, rootConfig.getInt(key))
+          } else {
+            valueS.toInt
+          }
+        }
+        sparkConf.getInt(s"spark.$key", default)
+      } else {
         if (valueS == null) {
-          sparkConf.getInt(key, rootConfig.getInt(key))
+          rootConfig.getInt(key)
         } else {
           valueS.toInt
         }
       }
-      sparkConf.getInt(s"spark.$key", default)
-    } else {
-      if (valueS == null) {
-        rootConfig.getInt(key)
-      } else {
-        valueS.toInt
-      }
+    } catch {
+      case e: NumberFormatException =>
+        throw new CloudantException(s"Option \'$key\' failed with exception $e")
     }
   }
 
@@ -162,22 +168,24 @@ object JsonStoreConfigManager {
 
   def getConfig (sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = {
 
-    implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
-    implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
-    implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
-    implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
-    implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
-    implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
-    implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG)
-    implicit val endpoint = getString(sparkConf, parameters, CLOUDANT_API_ENDPOINT)
-    implicit val selector = getString(sparkConf, parameters, FILTER_SELECTOR)
-    implicit val storageLevel = getStorageLevel(
+    implicit val total: Int = getInt(sparkConf, parameters, PARTITION_CONFIG)
+    implicit val max: Int = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
+    implicit val min: Int = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
+    implicit val requestTimeout: Long = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
+    implicit val bulkSize: Int = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
+    implicit val schemaSampleSize: Int = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
+    implicit val createDBOnSave: Boolean = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG)
+    implicit val endpoint: String = getString(sparkConf, parameters, CLOUDANT_API_ENDPOINT)
+    implicit val selector: String = getString(sparkConf, parameters, FILTER_SELECTOR)
+    implicit val storageLevel: StorageLevel = getStorageLevel(
       sparkConf, parameters, STORAGE_LEVEL_FOR_CHANGES_INDEX)
-    implicit val timeout = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT)
-    implicit val batchInterval = getInt(sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL)
+    implicit val timeout: Int = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT)
+    implicit val batchInterval: Int = getInt(
+      sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL)
+    implicit val numberOfRetries: Int = getInt(sparkConf, parameters, NUMBER_OF_RETRIES)
 
-    implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
-    implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
+    implicit val useQuery: Boolean = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
+    implicit val queryLimit: Int = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
 
     val dbName = parameters.getOrElse("database", parameters.getOrElse("path",
       throw new CloudantException(s"Cloudant database name is empty. " +
@@ -193,13 +201,12 @@ object JsonStoreConfigManager {
     if (endpoint == ALL_DOCS_INDEX) {
       new CloudantConfig(protocol, host, dbName, indexName,
         viewName) (user, passwd, total, max, min, requestTimeout, bulkSize,
-        schemaSampleSize, createDBOnSave, endpoint, useQuery,
-        queryLimit)
+        schemaSampleSize, createDBOnSave, endpoint, useQuery, queryLimit, numberOfRetries)
     } else if (endpoint == CHANGES_INDEX) {
       new CloudantChangesConfig(protocol, host, dbName, indexName,
         viewName) (user, passwd, total, max, min, requestTimeout,
         bulkSize, schemaSampleSize, createDBOnSave, endpoint, selector,
-        timeout, storageLevel, useQuery, queryLimit, batchInterval)
+        timeout, storageLevel, useQuery, queryLimit, batchInterval, numberOfRetries)
     } else {
       throw new CloudantException(s"spark.$CLOUDANT_API_ENDPOINT parameter " +
         s"is invalid. Please supply the valid option '" + ALL_DOCS_INDEX + "' or '" +

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
index 9d09ecb..3400468 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
@@ -18,32 +18,28 @@ package org.apache.bahir.cloudant.common
 
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.util.{Failure, Success}
 
-import scalaj.http.{Http, HttpRequest, HttpResponse}
 import ExecutionContext.Implicits.global
+import com.cloudant.client.api.model.Response
+import com.cloudant.http.HttpConnection
+import com.google.gson.{Gson, JsonElement, JsonObject}
 import org.slf4j.{Logger, LoggerFactory}
-import play.api.libs.json._
 
 import org.apache.bahir.cloudant.CloudantConfig
 
-
 class JsonStoreDataAccess (config: CloudantConfig)  {
   lazy val logger: Logger = LoggerFactory.getLogger(getClass)
   implicit lazy val timeout: Long = config.requestTimeout
 
   def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = {
-    if (limit == 0) {
-      throw new CloudantException("Database " + config.getDbname +
-        " schema sample size is 0!")
-    }
-    if (limit < -1) {
-      throw new CloudantException("Database " + config.getDbname +
-        " schema sample size is " + limit + "!")
+    if (limit == 0 || limit < -1) {
+      throw new CloudantException("Schema size '" + limit + "' is not valid.")
     }
     var r = this.getQueryResult[Seq[String]](config.getUrl(limit), processAll)
     if (r.isEmpty) {
@@ -58,35 +54,30 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
     }
   }
 
-  def getAll[T](url: String)
-      (implicit columns: Array[String] = null): Seq[String] = {
-    this.getQueryResult[Seq[String]](url, processAll)
-  }
-
   def getIterator(skip: Int, limit: Int, url: String)
       (implicit columns: Array[String] = null,
       postData: String = null): Iterator[String] = {
+    logger.info(s"Loading data from Cloudant using: $url , postData: $postData")
     val newUrl = config.getSubSetUrl(url, skip, limit, postData != null)
     this.getQueryResult[Iterator[String]](newUrl, processIterator)
   }
 
   def getTotalRows(url: String, queryUsed: Boolean)
-      (implicit postData: String = null): Int = {
-      if (queryUsed) config.queryLimit // Query can not retrieve total row now.
-      else {
-        val totalUrl = config.getTotalUrl(url)
-        this.getQueryResult[Int](totalUrl,
-          { result => config.getTotalRows(Json.parse(result))})
-      }
+                  (implicit postData: String = null): Int = {
+    if (queryUsed) config.queryLimit // Query can not retrieve total row now.
+    else {
+      val totalUrl = config.getTotalUrl(url)
+      this.getQueryResult[Int](totalUrl,
+        { result => config.getResultCount(result)})
+    }
   }
 
-  private def processAll (result: String)
+  private def processAll(result: String)
       (implicit columns: Array[String],
       postData: String = null) = {
     logger.debug(s"processAll:$result, columns:$columns")
-    val jsonResult: JsValue = Json.parse(result)
-    var rows = config.getRows(jsonResult, postData != null )
-    if (config.viewName == null && postData == null) {
+    var rows = config.getRows(result, postData != null)
+    if (config.viewPath == null && postData == null) {
       // filter design docs
       rows = rows.filter(r => FilterDDocs.filter(r))
     }
@@ -99,104 +90,43 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
     processAll(result).iterator
   }
 
-  private def convert(rec: JsValue)(implicit columns: Array[String]): String = {
-    if (columns == null) return Json.stringify(Json.toJson(rec))
-    val m = new mutable.HashMap[String, JsValue]()
-    for ( x <- columns) {
-        val field = JsonUtil.getField(rec, x).getOrElse(JsNull)
-        m.put(x, field)
+  private def convert(rec: JsonElement)(implicit columns: Array[String]): String = {
+    if (columns == null) {
+      rec.getAsJsonObject.toString
+    } else {
+      val jsonObject = new JsonObject
+      for (x <- columns) {
+        val field = JsonUtil.getField(rec, x).orNull
+        jsonObject.add(x, field)
+      }
+      val result = jsonObject.toString
+      logger.debug(s"converted: $result")
+      result
     }
-    val result = Json.stringify(Json.toJson(m.toMap))
-    logger.debug(s"converted: $result")
-    result
-  }
-
-
-  def getChanges(url: String, processResults: (String) => String): String = {
-    getQueryResult(url, processResults)
   }
 
   private def getQueryResult[T]
-      (url: String, postProcessor: (String) => T)
-      (implicit columns: Array[String] = null,
-      postData: String = null) : T = {
+  (url: String, postProcessor: (String) => T)
+  (implicit columns: Array[String] = null,
+   postData: String = null) : T = {
     logger.info(s"Loading data from Cloudant using: $url , postData: $postData")
 
-    val clRequest: HttpRequest = getClRequest(url, postData)
+    val clRequest: HttpConnection = config.executeRequest(url, postData)
 
-    val clResponse: HttpResponse[String] = clRequest.execute()
-    if (! clResponse.isSuccess) {
+    val clResponse: HttpConnection = clRequest.execute()
+    if (clResponse.getConnection.getResponseCode != 200) {
       throw new CloudantException("Database " + config.getDbname +
-        " request error: " + clResponse.body)
+        " request error: " + clResponse.responseAsString)
     }
-    val data = postProcessor(clResponse.body)
-    logger.debug(s"got result: $data")
+    val data = postProcessor(clResponse.responseAsString)
+    logger.debug(s"got result:$data")
     data
   }
 
   def createDB(): Unit = {
-    val url = config.getDbUrl.toString
-    val clRequest: HttpRequest = getClRequest(url, null, "PUT")
-
-    val clResponse: HttpResponse[String] = clRequest.execute()
-    if (! clResponse.isSuccess) {
-      throw new CloudantException("Database " + config.getDbname +
-        " create error: " + clResponse.body)
-    } else {
-      logger.warn(s"Database ${config.getDbname} was created.")
-    }
+    config.getClient.createDB(config.getDbname)
   }
 
-
-  def getClRequest(url: String, postData: String = null,
-                   httpMethod: String = null): HttpRequest = {
-    val requestTimeout = config.requestTimeout.toInt
-    config.username match {
-      case null =>
-        if (postData != null) {
-          Http(url)
-            .postData(postData)
-            .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
-            .header("Content-Type", "application/json")
-            .header("User-Agent", "spark-cloudant")
-        } else {
-          if (httpMethod != null) {
-            Http(url)
-              .method(httpMethod)
-              .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
-              .header("User-Agent", "spark-cloudant")
-          } else {
-            Http(url)
-              .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
-              .header("User-Agent", "spark-cloudant")
-          }
-        }
-      case _ =>
-        if (postData != null) {
-          Http(url)
-            .postData(postData)
-            .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
-            .header("Content-Type", "application/json")
-            .header("User-Agent", "spark-cloudant")
-            .auth(config.username, config.password)
-        } else {
-          if (httpMethod != null) {
-            Http(url)
-              .method(httpMethod)
-              .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
-              .header("User-Agent", "spark-cloudant")
-              .auth(config.username, config.password)
-          } else {
-            Http(url)
-              .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
-              .header("User-Agent", "spark-cloudant")
-              .auth(config.username, config.password)
-          }
-        }
-    }
-  }
-
-
   def saveAll(rows: List[String]): Unit = {
     if (rows.isEmpty) return
     val bulkSize = config.bulkSize
@@ -205,31 +135,29 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
     logger.debug(s"total records:${rows.size}=bulkSize:$bulkSize * totalBulks:$totalBulks")
 
     val futures = bulks.map( bulk => {
-      val data = config.getBulkRows(bulk)
-      val url = config.getBulkPostUrl.toString
-      val clRequest: HttpRequest = getClRequest(url, data)
+      val jsonData = config.getBulkRows(bulk)
         Future {
-          clRequest.execute()
+          config.getDatabase.bulk(jsonData.asJava)
         }
       }
     )
     // remaining - number of requests remained to succeed
     val remaining = new AtomicInteger(futures.length)
-    val p = Promise[HttpResponse[String]]
+    val p = Promise[java.util.List[Response]]
     futures foreach {
       _ onComplete {
-        case Success(clResponse: HttpResponse[String]) =>
-          // find if there was error in saving at least one of docs
-          val resBody: String = clResponse.body
-          val isErr = (resBody contains config.getConflictErrStr) ||
-            (resBody contains config.getForbiddenErrStr)
-          if (!clResponse.isSuccess || isErr) {
+        case Success(clResponses) =>
+          if (clResponses contains config.getConflictErrStr) {
+            val e = new CloudantException("Save to database:" + config.getDbname +
+              " failed with reason: " + config.getConflictErrStr)
+            p.tryFailure(e)
+          } else if (clResponses contains config.getForbiddenErrStr) {
             val e = new CloudantException("Save to database:" + config.getDbname +
-                " failed with reason: " + clResponse.body)
+              " failed with reason: " + config.getForbiddenErrStr)
             p.tryFailure(e)
           } else if (remaining.decrementAndGet() == 0) {
             // succeed the whole save operation if all requests success
-            p.trySuccess(clResponse)
+            p.trySuccess(clResponses)
           }
         // if a least one save request fails - fail the whole save operation
         case Failure(e) =>

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
index c4f828d..2ba480d 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
@@ -16,8 +16,8 @@
  */
 package org.apache.bahir.cloudant.common
 
+import com.google.gson._
 import org.slf4j.LoggerFactory
-import play.api.libs.json.{Json, JsValue}
 
 import org.apache.spark.Partition
 import org.apache.spark.SparkContext
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 
 import org.apache.bahir.cloudant.CloudantConfig
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
 
 /**
  * JsonStoreRDDPartition defines each partition as a subset of a query result:
@@ -33,8 +34,9 @@ import org.apache.bahir.cloudant.CloudantConfig
  */
 
 private[cloudant] class JsonStoreRDDPartition(val url: String, val skip: Int, val limit: Int,
-    val idx: Int, val config: CloudantConfig, val selector: JsValue, val fields: JsValue,
-    val queryUsed: Boolean)
+                                              val idx: Int, val config: CloudantConfig,
+                                              val selector: JsonElement, val fields: JsonElement,
+                                              val queryUsed: Boolean)
     extends Partition with Serializable{
   val index: Int = idx
 }
@@ -92,7 +94,9 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
     }
   }
 
-  private def convertToMangoJson(f: Filter): (String, JsValue) = {
+  private def convertToMangoJson(f: Filter): (String, JsonElement) = {
+    val gson = new Gson
+    val parser = new JsonParser()
     val (op, value): (String, Any) = f match {
       case EqualTo(attr, v) => ("$eq", v)
       case GreaterThan(attr, v) => ("$gt", v)
@@ -101,16 +105,16 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
       case LessThanOrEqual(attr, v) => ("$lte", v)
       case _ => (null, null)
     }
-    val convertedV: JsValue = {
+    val convertedV: JsonElement = {
       // TODO Better handing of other types
       if (value != null) {
         value match {
-          case s: String => Json.toJson(s)
-          case l: Long => Json.toJson(l)
-          case d: Double => Json.toJson(d)
-          case i: Int => Json.toJson(i)
-          case b: Boolean => Json.toJson(b)
-          case t: java.sql.Timestamp => Json.toJson(t)
+          case s: String => parser.parse(s)
+          case l: Long => parser.parse(l.toString)
+          case d: Double => parser.parse(d.toString)
+          case i: Int => parser.parse(i.toString)
+          case b: Boolean => parser.parse(b.toString)
+          case t: java.sql.Timestamp => parser.parse(t.toString)
           case a: Any => logger.debug(s"Ignore field:$name, cannot handle its datatype: $a"); null
         }
       } else null
@@ -118,7 +122,7 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
     (op, convertedV)
   }
 
-  private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsValue] = {
+  private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsonElement] = {
     filters.map(af => convertToMangoJson(af))
             .filter(x => x._2 != null)
             .toMap
@@ -137,10 +141,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
       }
     }
 
-    val (selector, fields) : (JsValue, JsValue) = {
+    val (selector, fields) : (JsonElement, JsonElement) = {
       if (!config.queryEnabled || origAttrToFilters == null) (null, null)
       else {
-        val selectors: Map[String, Map[String, JsValue]] =
+        val selectors: Map[String, Map[String, JsonElement]] =
           origAttrToFilters.transform( (name, attrFilters) => convertAttrToMangoJson(attrFilters))
         val filteredSelectors = selectors.filter((t) => t._2.nonEmpty)
 
@@ -149,10 +153,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
               if (requiredcolumns == null || requiredcolumns.length == 0) {
                 null
               } else {
-                Json.toJson(requiredcolumns)
+                JsonConverter.toJson(requiredcolumns)
               }
           }
-          (Json.toJson(filteredSelectors), queryColumns)
+          (JsonConverter.toJson(filteredSelectors), queryColumns)
         } else (null, null)
       }
     }
@@ -174,7 +178,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
 
     implicit val postData : String = {
       if (queryUsed) {
-        Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
+        val jsonSelector = new JsonObject
+        jsonSelector.addProperty("selector", selector.toString)
+        jsonSelector.addProperty("limit", 1)
+        jsonSelector.toString
       } else {
         null
       }
@@ -191,7 +198,8 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
    (0 until totalPartition).map(i => {
       val skip = i * limitPerPartition
       new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
-          config, selector, fields, queryUsed).asInstanceOf[Partition]
+          config, selector, fields, queryUsed)
+        .asInstanceOf[Partition]
     }).toArray
   }
 
@@ -199,12 +207,15 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
       Iterator[String] = {
     val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
     implicit val postData : String = {
+      val jsonObject = new JsonObject
+      jsonObject.add("selector", myPartition.selector)
+      jsonObject.addProperty("skip", myPartition.skip)
       if (myPartition.queryUsed && myPartition.fields != null) {
-        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
-            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
+        jsonObject.add("fields", myPartition.fields)
+        jsonObject.toString
       } else if (myPartition.queryUsed) {
-        Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
-            "skip" -> myPartition.skip))
+        jsonObject.addProperty("limit", myPartition.limit)
+        jsonObject.toString
       } else {
         null
       }

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
index 82d9afc..eadceea 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
@@ -16,27 +16,38 @@
  */
 package org.apache.bahir.cloudant.common
 
-import play.api.libs.json.JsValue
 import scala.util.control.Breaks._
 
+import com.google.gson.{JsonElement, JsonParser}
+
 object JsonUtil {
-  def getField(row: JsValue, field: String) : Option[JsValue] = {
+  def getField(row: JsonElement, field: String) : Option[JsonElement] = {
     var path = field.split('.')
     var currentValue = row
-    var finalValue: Option[JsValue] = None
+    var finalValue: Option[JsonElement] = None
     breakable {
       for (i <- path.indices) {
-        val f: Option[JsValue] = (currentValue \ path(i)).toOption
-        f match {
-          case Some(f2) => currentValue = f2
-          case None => break
-        }
-        if (i == path.length -1) {
-          // The leaf node
-          finalValue = Some(currentValue)
+        if (currentValue != null && currentValue.isJsonObject) {
+          val f: Option[JsonElement] =
+            Option(currentValue.getAsJsonObject.get(path(i)))
+          f match {
+            case Some(f2) => currentValue = f2
+            case None => break
+          }
+          if (i == path.length - 1) {
+            // The leaf node
+            finalValue = Some(currentValue)
+          }
         }
       }
     }
     finalValue
   }
+
+  object JsonConverter {
+    val parser = new JsonParser
+    def toJson(value: Any): JsonElement = {
+      parser.parse(value.toString)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
index 323aab6..56671b3 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
@@ -17,8 +17,10 @@
 package org.apache.bahir.cloudant.internal
 
 import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
 
-import scalaj.http._
+import com.google.gson.JsonParser
+import okhttp3._
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.receiver.Receiver
@@ -37,45 +39,46 @@ class ChangesReceiver(config: CloudantChangesConfig)
   }
 
   private def receive(): Unit = {
-    // Get normal _changes url
+    val okHttpClient: OkHttpClient = new OkHttpClient.Builder()
+        .connectTimeout(5, TimeUnit.SECONDS)
+        .readTimeout(60, TimeUnit.SECONDS)
+        .build
     val url = config.getChangesReceiverUrl.toString
-    val selector: String = {
-      "{\"selector\":" + config.getSelector + "}"
-    }
 
-    val clRequest: HttpRequest = config.username match {
-      case null =>
-        Http(url)
-          .postData(selector)
-          .header("Content-Type", "application/json")
-          .header("User-Agent", "spark-cloudant")
-      case _ =>
-        Http(url)
-          .postData(selector)
-          .header("Content-Type", "application/json")
-          .header("User-Agent", "spark-cloudant")
-          .auth(config.username, config.password)
+    val builder = new Request.Builder().url(url)
+    if (config.username != null) {
+      val credential = Credentials.basic(config.username, config.password)
+      builder.header("Authorization", credential)
+    }
+    if(config.getSelector != null) {
+      val jsonType = MediaType.parse("application/json; charset=utf-8")
+      val selector = "{\"selector\":" + config.getSelector + "}"
+      val selectorBody = RequestBody.create(jsonType, selector)
+      builder.post(selectorBody)
     }
 
-    clRequest.exec((code, headers, is) => {
-      if (code == 200) {
-        var json = new ChangesRow()
-        if (is != null) {
-          val bufferedReader = new BufferedReader(new InputStreamReader(is))
-          while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) {
-            if (!isStopped() && json != null && !json.getDoc.has("_deleted")) {
-              store(json.getDoc.toString)
-            }
+    val request = builder.build
+    val response = okHttpClient.newCall(request).execute
+    val status_code = response.code
+
+    if (status_code == 200) {
+      val changesInputStream = response.body.byteStream
+      var json = new ChangesRow()
+      if (changesInputStream != null) {
+        val bufferedReader = new BufferedReader(new InputStreamReader(changesInputStream))
+        while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) {
+          if (!isStopped() && json != null && !json.getDoc.has("_deleted")) {
+            store(json.getDoc.toString)
           }
         }
-      } else {
-        val status = headers.getOrElse("Status", IndexedSeq.empty)
-        val errorMsg = "Error retrieving _changes feed data from database " +
-          "'" + config.getDbname + "': " + status(0)
-        reportError(errorMsg, new CloudantException(errorMsg))
-        CloudantChangesConfig.receiverErrorMsg = errorMsg
       }
-    })
+    } else {
+      val responseAsJson = new JsonParser().parse(response.body.string)
+      val errorMsg = "Error retrieving _changes feed data from database " + "'" +
+        config.getDbname + "' with response code " + status_code + ": " + responseAsJson.toString
+      reportError(errorMsg, new CloudantException(errorMsg))
+      CloudantChangesConfig.receiverErrorMsg = errorMsg
+    }
   }
 
   override def onStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
index 8495026..c487937 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
@@ -42,7 +42,6 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
     }
     assert(thrown.getMessage === s"spark.cloudant.endpoint parameter " +
       s"is invalid. Please supply the valid option '_all_docs' or '_changes'.")
-
   }
 
   testIfEnabled("empty username option throws an error message") {
@@ -99,7 +98,35 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
     val thrown = intercept[CloudantException] {
       spark.read.format("org.apache.bahir.cloudant").load("n_flight")
     }
-    assert(thrown.getMessage === s"Error retrieving _changes feed data" +
-      s" from database 'n_flight': HTTP/1.1 401 Unauthorized")
+    assert(thrown.getMessage === "Error retrieving _changes feed data" +
+      " from database 'n_flight' with response code 401: {\"error\":\"unauthorized\"," +
+      "\"reason\":\"Name or password is incorrect.\"}")
+  }
+
+  testIfEnabled("string with valid value for cloudant.numberOfRetries option") {
+    spark = SparkSession.builder().config(conf)
+      .config("cloudant.host", TestUtils.getHost)
+      .config("cloudant.username", TestUtils.getUsername)
+      .config("cloudant.password", TestUtils.getPassword)
+      .config("cloudant.numberOfRetries", "5")
+      .getOrCreate()
+
+    val df = spark.read.format("org.apache.bahir.cloudant").load("n_booking")
+    assert(df.count() === 2)
+  }
+
+  testIfEnabled("invalid value for cloudant.numberOfRetries option throws an error message") {
+    spark = SparkSession.builder().config(conf)
+      .config("cloudant.host", TestUtils.getHost)
+      .config("cloudant.username", TestUtils.getUsername)
+      .config("cloudant.password", TestUtils.getPassword)
+      .config("cloudant.numberOfRetries", "five")
+      .getOrCreate()
+
+    val thrown = intercept[CloudantException] {
+      spark.read.format("org.apache.bahir.cloudant").load("db")
+    }
+    assert(thrown.getMessage === s"Option \'cloudant.numberOfRetries\' failed with exception " +
+      s"""java.lang.NumberFormatException: For input string: "five"""")
   }
 }