You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/01/26 15:43:53 UTC
bahir git commit: [BAHIR-154] Refactor sql-cloudant to use
cloudant-client library
Repository: bahir
Updated Branches:
refs/heads/master 785ee1e1a -> 6ea42a896
[BAHIR-154] Refactor sql-cloudant to use cloudant-client library
- Use java-cloudant’s executeRequest for HTTP requests against
_all_docs endpoint
- Added HTTP 429 backoff with default settings
- Simplified caught exception and message for schema size
- Replaced scala http library with okhttp library for changes receiver
- Updated streaming CloudantReceiver class to use improved
ChangesRowScanner method
- Replaced Play JSON with GSON library
- Updated save operation to use java-cloudant bulk API
- Use _changes feed filter option for Cloudant/CouchDB 2.x and greater
Closes #61
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/6ea42a89
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/6ea42a89
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/6ea42a89
Branch: refs/heads/master
Commit: 6ea42a8965d98773a342ad5cd31aab6b64e9d9bd
Parents: 785ee1e
Author: Esteban Laver <em...@us.ibm.com>
Authored: Sun Dec 17 13:30:04 2017 -0500
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Jan 26 07:43:07 2018 -0800
----------------------------------------------------------------------
sql-cloudant/README.md | 1 +
sql-cloudant/pom.xml | 21 +--
.../src/main/resources/application.conf | 1 +
.../bahir/cloudant/CloudantChangesConfig.scala | 15 +-
.../apache/bahir/cloudant/CloudantConfig.scala | 136 +++++++++------
.../bahir/cloudant/CloudantReceiver.scala | 77 ++++-----
.../apache/bahir/cloudant/DefaultSource.scala | 13 +-
.../bahir/cloudant/common/FilterUtil.scala | 15 +-
.../common/JsonStoreConfigManager.scala | 63 ++++---
.../cloudant/common/JsonStoreDataAccess.scala | 172 ++++++-------------
.../bahir/cloudant/common/JsonStoreRDD.scala | 55 +++---
.../apache/bahir/cloudant/common/JsonUtil.scala | 33 ++--
.../cloudant/internal/ChangesReceiver.scala | 69 ++++----
.../bahir/cloudant/CloudantOptionSuite.scala | 33 +++-
14 files changed, 358 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index 160e279..b651990 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -63,6 +63,7 @@ cloudant.protocol|https|protocol to use to transfer data: http or https
cloudant.host| |cloudant host url
cloudant.username| |cloudant userid
cloudant.password| |cloudant password
+cloudant.numberOfRetries|3| number of times to replay a request that received a 429 `Too Many Requests` response
cloudant.useQuery|false|by default, `_all_docs` endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, `_find` endpoint will be used in place of `_all_docs` when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.
cloudant.queryLimit|25|the maximum number of results returned when querying the `_find` endpoint.
cloudant.storageLevel|MEMORY_ONLY|the storage level for persisting Spark RDDs during load when `cloudant.endpoint` is set to `_changes`. See [RDD Persistence section](https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) in Spark's Progamming Guide for all available storage level options.
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/pom.xml
----------------------------------------------------------------------
diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml
index 55a5210..672418a 100644
--- a/sql-cloudant/pom.xml
+++ b/sql-cloudant/pom.xml
@@ -36,8 +36,14 @@
<dependencies>
<dependency>
- <groupId>com.typesafe.play</groupId>
- <artifactId>play-json_${scala.binary.version}</artifactId>
+ <groupId>com.cloudant</groupId>
+ <artifactId>cloudant-client</artifactId>
+ <version>2.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>3.9.0</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -45,11 +51,6 @@
<version>1.3.1</version>
</dependency>
<dependency>
- <groupId>org.joda</groupId>
- <artifactId>joda-convert</artifactId>
- <version>1.8.1</version>
- </dependency>
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.7</version>
@@ -106,12 +107,6 @@
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.cloudant</groupId>
- <artifactId>cloudant-client</artifactId>
- <version>2.11.0</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf
index 6ff4139..7f7beaf 100644
--- a/sql-cloudant/src/main/resources/application.conf
+++ b/sql-cloudant/src/main/resources/application.conf
@@ -11,6 +11,7 @@ spark-sql {
cloudant = {
batchInterval = 8
endpoint = "_all_docs"
+ numberOfRetries = 3
protocol = https
useQuery = false
queryLimit = 25
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
index 9f2a7ba..0615e16 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
@@ -27,10 +27,10 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
bulkSize: Int, schemaSampleSize: Int,
createDBOnSave: Boolean, endpoint: String, selector: String,
timeout: Int, storageLevel: StorageLevel, useQuery: Boolean,
- queryLimit: Int, batchInterval: Int)
+ queryLimit: Int, batchInterval: Int, numberOfRetries: Int)
extends CloudantConfig(protocol, host, dbName, indexName, viewName)(username, password,
partitions, maxInPartition, minInPartition, requestTimeout, bulkSize, schemaSampleSize,
- createDBOnSave, endpoint, useQuery, queryLimit) {
+ createDBOnSave, endpoint, useQuery, queryLimit, numberOfRetries) {
override val defaultIndex: String = endpoint
@@ -42,9 +42,14 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
if (selector != null && !selector.isEmpty) {
selector
} else {
- // Exclude design docs and deleted=true docs
- "{ \"_id\": { \"$regex\": \"^(?!_design/)\" }, " +
- "\"_deleted\": { \"$exists\": false } }"
+ val version = getClient.serverVersion
+ if (version.matches("1.*")) {
+ null
+ } else {
+ // Exclude design docs and deleted=true docs
+ "{ \"_id\": { \"$regex\": \"^(?!_design/)\" }, " +
+ "\"_deleted\": { \"$exists\": false } }"
+ }
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
index 8affd4f..c7370f0 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
@@ -16,11 +16,21 @@
*/
package org.apache.bahir.cloudant
-import java.net.URLEncoder
+import java.net.{URL, URLEncoder}
-import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.reflect.io.File
+
+import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
+import com.cloudant.client.api.model.SearchResult
+import com.cloudant.client.api.views._
+import com.cloudant.http.{Http, HttpConnection}
+import com.cloudant.http.interceptors.Replay429Interceptor
+import com.google.gson.{JsonObject, JsonParser}
import org.apache.bahir.cloudant.common._
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
/*
* Only allow one field pushdown now
@@ -28,22 +38,48 @@ import org.apache.bahir.cloudant.common._
*/
class CloudantConfig(val protocol: String, val host: String,
- val dbName: String, val indexName: String, val viewName: String)
+ val dbName: String, val indexPath: String, val viewPath: String)
(implicit val username: String, val password: String,
val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
val createDBOnSave: Boolean, val endpoint: String,
- val useQuery: Boolean = false, val queryLimit: Int)
+ val useQuery: Boolean = false, val queryLimit: Int,
+ val numberOfRetries: Int)
extends Serializable {
+ @transient private lazy val client: CloudantClient = ClientBuilder
+ .url(getClientUrl)
+ .username(username)
+ .password(password)
+ .interceptors(new Replay429Interceptor(numberOfRetries, 250L))
+ .build
+ @transient private lazy val database: Database = client.database(dbName, false)
lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
val pkField = "_id"
val defaultIndex: String = endpoint
val default_filter: String = "*:*"
- def getDbUrl: String = {
- dbUrl
+ def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
+ val url = new URL(stringUrl)
+ if(postData != null) {
+ val conn = Http.POST(url, "application/json")
+ conn.setRequestBody(postData)
+ conn.requestProperties.put("User-Agent", "spark-cloudant")
+ client.executeRequest(conn)
+ } else {
+ val conn = Http.GET(url)
+ conn.requestProperties.put("User-Agent", "spark-cloudant")
+ client.executeRequest(conn)
+ }
+ }
+
+ def getClient: CloudantClient = {
+ client
+ }
+
+ def getDatabase: Database = {
+ database
}
def getSchemaSampleSize: Int = {
@@ -54,20 +90,20 @@ class CloudantConfig(val protocol: String, val host: String,
createDBOnSave
}
- def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
+ def getClientUrl: URL = {
+ new URL(protocol + "://" + host)
+ }
+
+ def getLastNum(result: JsonObject): JsonObject = result.get("last_seq").getAsJsonObject
/* Url containing limit for docs in a Cloudant database.
* If a view is not defined, use the _all_docs endpoint.
* @return url with one doc limit for retrieving total doc count
*/
def getUrl(limit: Int, excludeDDoc: Boolean = false): String = {
- if (viewName == null) {
+ if (viewPath == null) {
val baseUrl = {
- if (excludeDDoc) {
- dbUrl + "/_all_docs?startkey=%22_design0/%22&include_docs=true"
- } else {
- dbUrl + "/_all_docs?include_docs=true"
- }
+ dbUrl + "/_all_docs?include_docs=true"
}
if (limit == JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
baseUrl
@@ -76,16 +112,16 @@ class CloudantConfig(val protocol: String, val host: String,
}
} else {
if (limit == JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
- dbUrl + "/" + viewName
+ dbUrl + "/" + viewPath
} else {
- dbUrl + "/" + viewName + "?limit=" + limit
+ dbUrl + "/" + viewPath + "?limit=" + limit
}
}
}
- /* Url containing limit to count total docs in a Cloudant database.
+ /* Total count of documents in a Cloudant database.
*
- * @return url with one doc limit for retrieving total doc count
+ * @return total doc count number
*/
def getTotalUrl(url: String): String = {
if (url.contains('?')) {
@@ -100,10 +136,10 @@ class CloudantConfig(val protocol: String, val host: String,
}
def queryEnabled: Boolean = {
- useQuery && indexName == null && viewName == null
+ useQuery && indexPath == null && viewPath == null
}
- def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && !queryUsed}
+ def allowPartition(queryUsed: Boolean): Boolean = {indexPath == null && !queryUsed}
def getRangeUrl(field: String = null, start: Any = null,
startInclusive: Boolean = false, end: Any = null,
@@ -112,7 +148,7 @@ class CloudantConfig(val protocol: String, val host: String,
allowQuery: Boolean = false): (String, Boolean, Boolean) = {
val (url: String, pusheddown: Boolean, queryUsed: Boolean) =
calculate(field, start, startInclusive, end, endInclusive, allowQuery)
- if (includeDoc && !queryUsed ) {
+ if (includeDoc && !queryUsed) {
if (url.indexOf('?') > 0) {
(url + "&include_docs=true", pusheddown, queryUsed)
} else {
@@ -123,14 +159,12 @@ class CloudantConfig(val protocol: String, val host: String,
}
}
- /*
- * Url for paging using skip and limit options when loading docs with partitions.
- */
+
def getSubSetUrl(url: String, skip: Int, limit: Int, queryUsed: Boolean): String = {
val suffix = {
if (url.indexOf(JsonStoreConfigManager.ALL_DOCS_INDEX) > 0) {
"include_docs=true&limit=" + limit + "&skip=" + skip
- } else if (viewName != null) {
+ } else if (viewPath != null) {
"limit=" + limit + "&skip=" + skip
} else if (queryUsed) {
""
@@ -170,13 +204,13 @@ class CloudantConfig(val protocol: String, val host: String,
}
}
(dbUrl + "/" + defaultIndex + condition, true, false)
- } else if (indexName != null) {
+ } else if (indexPath != null) {
// push down to indexName
val condition = calculateCondition(field, start, startInclusive,
end, endInclusive)
- (dbUrl + "/" + indexName + "?q=" + condition, true, false)
- } else if (viewName != null) {
- (dbUrl + "/" + viewName, false, false)
+ (dbUrl + "/" + indexPath + "?q=" + condition, true, false)
+ } else if (viewPath != null) {
+ (dbUrl + "/" + viewPath, false, false)
} else if (allowQuery && useQuery) {
(s"$dbUrl/_find", false, true)
} else {
@@ -220,39 +254,41 @@ class CloudantConfig(val protocol: String, val host: String,
}
}
- def getTotalRows(result: JsValue): Int = {
- val resultKeys = result.as[JsObject].keys
- if(resultKeys.contains("total_rows")) {
- (result \ "total_rows").as[Int]
- } else if (resultKeys.contains("pending")) {
- (result \ "pending").as[Int] + 1
+ def getResultCount(result: String): Int = {
+ val jsonResult: JsonObject = new JsonParser().parse(result).getAsJsonObject
+ if (jsonResult.has("total_rows")) {
+ jsonResult.get("total_rows").getAsInt
+ } else if (jsonResult.has("pending")) {
+ jsonResult.get("pending").getAsInt + 1
} else {
1
}
}
- def getRows(result: JsValue, queryUsed: Boolean): Seq[JsValue] = {
- if ( queryUsed ) {
- (result \ "docs").as[JsArray].value.map(row => row)
+ def getRows(result: String, queryUsed: Boolean): Seq[JsonObject] = {
+ val jsonResult: JsonObject = new JsonParser().parse(result).getAsJsonObject
+ if (queryUsed) {
+ if (jsonResult.has("docs")) {
+ jsonResult.get("docs").getAsJsonArray.asScala
+ .map(row => row.getAsJsonObject).toSeq
+ } else {
+ Seq()
+ }
} else {
- val containsResultsKey: Boolean = result.as[JsObject].keys.contains("results")
- if (containsResultsKey) {
- (result \ "results").as[JsArray].value.map(row => (row \ "doc").get)
- } else if (viewName == null) {
- (result \ "rows").as[JsArray].value.map(row => (row \ "doc").get)
+ if (jsonResult.has("results")) {
+ jsonResult.get("result").getAsJsonArray.asScala.map(row => row.getAsJsonObject
+ .get("doc").getAsJsonObject).toSeq
+ } else if (viewPath == null) {
+ jsonResult.get("rows").getAsJsonArray.asScala.map(row => row.getAsJsonObject
+ .get("doc").getAsJsonObject).toSeq
} else {
- (result \ "rows").as[JsArray].value.map(row => row)
+ jsonResult.get("rows").getAsJsonArray.asScala.map(row => row.getAsJsonObject).toSeq
}
}
}
- def getBulkPostUrl: String = {
- dbUrl + "/_bulk_docs"
- }
-
- def getBulkRows(rows: List[String]): String = {
- val docs = rows.map { x => Json.parse(x) }
- Json.stringify(Json.obj("docs" -> Json.toJson(docs.toSeq)))
+ def getBulkRows(rows: List[String]): List[JsonObject] = {
+ rows.map { x => JsonConverter.toJson(x).getAsJsonObject }
}
def getConflictErrStr: String = {
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
index 60a7d4a..029cabc 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
@@ -16,8 +16,10 @@
*/
package org.apache.bahir.cloudant
-import play.api.libs.json.Json
-import scalaj.http._
+import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
+
+import okhttp3._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
@@ -42,50 +44,43 @@ class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String]
}
private def receive(): Unit = {
- val url = config.getContinuousChangesUrl.toString
- val selector: String = if (config.getSelector != null) {
- "{\"selector\":" + config.getSelector + "}"
- } else {
- "{}"
- }
+ val okHttpClient: OkHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(5, TimeUnit.SECONDS)
+ .readTimeout(60, TimeUnit.SECONDS)
+ .build
+ val url = config.getChangesReceiverUrl.toString
- val clRequest: HttpRequest = config.username match {
- case null =>
- Http(url)
- .postData(selector)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
- .header("Content-Type", "application/json")
- .header("User-Agent", "spark-cloudant")
- case _ =>
- Http(url)
- .postData(selector)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = 0)
- .header("Content-Type", "application/json")
- .header("User-Agent", "spark-cloudant")
- .auth(config.username, config.password)
+ val builder = new Request.Builder().url(url)
+ if (config.username != null) {
+ val credential = Credentials.basic(config.username, config.password)
+ builder.header("Authorization", credential)
+ }
+ if(config.getSelector != null) {
+ val jsonType = MediaType.parse("application/json; charset=utf-8")
+ val selector = "{\"selector\":" + config.getSelector + "}"
+ val selectorBody = RequestBody.create(jsonType, selector)
+ builder.post(selectorBody)
}
- clRequest.exec((code, headers, is) => {
- if (code == 200) {
- scala.io.Source.fromInputStream(is, "utf-8").getLines().foreach(line => {
- if (line.length() > 0) {
- val json = Json.parse(line)
- val jsonDoc = (json \ "doc").getOrElse(null)
- var doc = ""
- if(jsonDoc != null) {
- doc = Json.stringify(jsonDoc)
- if(!isStopped() && doc.nonEmpty) {
- store(doc)
- }
- }
+ val request = builder.build
+ val response = okHttpClient.newCall(request).execute
+ val status_code = response.code
+
+ if (status_code == 200) {
+ val changesInputStream = response.body.byteStream
+ var json = new ChangesRow()
+ if (changesInputStream != null) {
+ val bufferedReader = new BufferedReader(new InputStreamReader(changesInputStream))
+ while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) {
+ if (!isStopped() && json != null && !json.getDoc.has("_deleted")) {
+ store(json.getDoc.toString)
}
- })
- } else {
- val status = headers.getOrElse("Status", IndexedSeq.empty)
- val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status(0)
- reportError(errorMsg, new CloudantException(errorMsg))
+ }
}
- })
+ } else {
+ val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status_code
+ reportError(errorMsg, new CloudantException(errorMsg))
+ }
}
def onStop(): Unit = {
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 2685993..47643cc 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -109,11 +109,11 @@ class DefaultSource extends RelationProvider
if (inSchema != null) {
inSchema
} else if (!config.isInstanceOf[CloudantChangesConfig]
- || config.viewName != null || config.indexName != null) {
+ || config.viewPath != null || config.indexPath != null) {
val df = if (config.getSchemaSampleSize ==
JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
- config.viewName == null
- && config.indexName == null) {
+ config.viewPath == null
+ && config.indexPath == null) {
val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
dataFrame = sqlContext.read.json(cloudantRDD.toDS())
dataFrame
@@ -144,12 +144,7 @@ class DefaultSource extends RelationProvider
// Collect and union each RDD to convert all RDDs to a DataFrame
changes.foreachRDD((rdd: RDD[String]) => {
if (!rdd.isEmpty()) {
- if (globalRDD != null) {
- // Union RDDs in foreach loop
- globalRDD = globalRDD.union(rdd)
- } else {
- globalRDD = rdd
- }
+ globalRDD = rdd ++ globalRDD
} else {
// Convert final global RDD[String] to DataFrame
dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS())
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
index ef1f7da..79f681d 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/FilterUtil.scala
@@ -16,12 +16,11 @@
*/
package org.apache.bahir.cloudant.common
+import com.google.gson.JsonObject
import org.slf4j.LoggerFactory
-import play.api.libs.json.{JsObject, JsString, JsValue}
import org.apache.spark.sql.sources._
-
/**
* Only handles the following filter condition
* 1. EqualTo,GreaterThan,LessThan,GreaterThanOrEqual,LessThanOrEqual,In
@@ -118,11 +117,11 @@ class FilterInterpreter(origFilters: Array[Filter]) {
*/
class FilterUtil(filters: Map[String, Array[Filter]]) {
private val logger = LoggerFactory.getLogger(getClass)
- def apply(implicit r: JsValue = null): Boolean = {
+ def apply(implicit r: JsonObject = null): Boolean = {
if (r == null) return true
val satisfied = filters.forall({
case (attr, filters) =>
- val field = JsonUtil.getField(r, attr).getOrElse(null)
+ val field = JsonUtil.getField(r, attr).orNull
if (field == null) {
logger.debug(s"field $attr not exisit:$r")
false
@@ -136,12 +135,10 @@ class FilterUtil(filters: Map[String, Array[Filter]]) {
object FilterDDocs {
- def filter(row: JsValue): Boolean = {
+ def filter(row: JsonObject): Boolean = {
if (row == null) return true
- val id : String = if (row.as[JsObject].keys.contains("_id")) {
- JsonUtil.getField(row, "_id").orNull.as[JsString].value
- } else if (row.as[JsObject].keys.contains("id")) {
- JsonUtil.getField(row, "id").orNull.as[JsString].value
+ val id : String = if (row.has("_id")) {
+ row.get("_id").getAsString
} else {
null
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
index 9cd495d..0e66f03 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
@@ -40,6 +40,7 @@ object JsonStoreConfigManager {
private val CLOUDANT_CHANGES_TIMEOUT = "cloudant.timeout"
private val USE_QUERY_CONFIG = "cloudant.useQuery"
private val QUERY_LIMIT_CONFIG = "cloudant.queryLimit"
+ private val NUMBER_OF_RETRIES = "cloudant.numberOfRetries"
private val FILTER_SELECTOR = "selector"
private val PARTITION_CONFIG = "jsonstore.rdd.partitions"
@@ -68,22 +69,27 @@ object JsonStoreConfigManager {
private def getInt(sparkConf: SparkConf, parameters: Map[String, String],
key: String) : Int = {
- val valueS = parameters.getOrElse(key, null)
- if (sparkConf != null) {
- val default = {
+ try {
+ val valueS = parameters.getOrElse(key, null)
+ if (sparkConf != null) {
+ val default = {
+ if (valueS == null) {
+ sparkConf.getInt(key, rootConfig.getInt(key))
+ } else {
+ valueS.toInt
+ }
+ }
+ sparkConf.getInt(s"spark.$key", default)
+ } else {
if (valueS == null) {
- sparkConf.getInt(key, rootConfig.getInt(key))
+ rootConfig.getInt(key)
} else {
valueS.toInt
}
}
- sparkConf.getInt(s"spark.$key", default)
- } else {
- if (valueS == null) {
- rootConfig.getInt(key)
- } else {
- valueS.toInt
- }
+ } catch {
+ case e: NumberFormatException =>
+ throw new CloudantException(s"Option \'$key\' failed with exception $e")
}
}
@@ -162,22 +168,24 @@ object JsonStoreConfigManager {
def getConfig (sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = {
- implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
- implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
- implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
- implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
- implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
- implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
- implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG)
- implicit val endpoint = getString(sparkConf, parameters, CLOUDANT_API_ENDPOINT)
- implicit val selector = getString(sparkConf, parameters, FILTER_SELECTOR)
- implicit val storageLevel = getStorageLevel(
+ implicit val total: Int = getInt(sparkConf, parameters, PARTITION_CONFIG)
+ implicit val max: Int = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
+ implicit val min: Int = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
+ implicit val requestTimeout: Long = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
+ implicit val bulkSize: Int = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
+ implicit val schemaSampleSize: Int = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
+ implicit val createDBOnSave: Boolean = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG)
+ implicit val endpoint: String = getString(sparkConf, parameters, CLOUDANT_API_ENDPOINT)
+ implicit val selector: String = getString(sparkConf, parameters, FILTER_SELECTOR)
+ implicit val storageLevel: StorageLevel = getStorageLevel(
sparkConf, parameters, STORAGE_LEVEL_FOR_CHANGES_INDEX)
- implicit val timeout = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT)
- implicit val batchInterval = getInt(sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL)
+ implicit val timeout: Int = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT)
+ implicit val batchInterval: Int = getInt(
+ sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL)
+ implicit val numberOfRetries: Int = getInt(sparkConf, parameters, NUMBER_OF_RETRIES)
- implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
- implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
+ implicit val useQuery: Boolean = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
+ implicit val queryLimit: Int = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
val dbName = parameters.getOrElse("database", parameters.getOrElse("path",
throw new CloudantException(s"Cloudant database name is empty. " +
@@ -193,13 +201,12 @@ object JsonStoreConfigManager {
if (endpoint == ALL_DOCS_INDEX) {
new CloudantConfig(protocol, host, dbName, indexName,
viewName) (user, passwd, total, max, min, requestTimeout, bulkSize,
- schemaSampleSize, createDBOnSave, endpoint, useQuery,
- queryLimit)
+ schemaSampleSize, createDBOnSave, endpoint, useQuery, queryLimit, numberOfRetries)
} else if (endpoint == CHANGES_INDEX) {
new CloudantChangesConfig(protocol, host, dbName, indexName,
viewName) (user, passwd, total, max, min, requestTimeout,
bulkSize, schemaSampleSize, createDBOnSave, endpoint, selector,
- timeout, storageLevel, useQuery, queryLimit, batchInterval)
+ timeout, storageLevel, useQuery, queryLimit, batchInterval, numberOfRetries)
} else {
throw new CloudantException(s"spark.$CLOUDANT_API_ENDPOINT parameter " +
s"is invalid. Please supply the valid option '" + ALL_DOCS_INDEX + "' or '" +
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
index 9d09ecb..3400468 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
@@ -18,32 +18,28 @@ package org.apache.bahir.cloudant.common
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.{Failure, Success}
-import scalaj.http.{Http, HttpRequest, HttpResponse}
import ExecutionContext.Implicits.global
+import com.cloudant.client.api.model.Response
+import com.cloudant.http.HttpConnection
+import com.google.gson.{Gson, JsonElement, JsonObject}
import org.slf4j.{Logger, LoggerFactory}
-import play.api.libs.json._
import org.apache.bahir.cloudant.CloudantConfig
-
class JsonStoreDataAccess (config: CloudantConfig) {
lazy val logger: Logger = LoggerFactory.getLogger(getClass)
implicit lazy val timeout: Long = config.requestTimeout
def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = {
- if (limit == 0) {
- throw new CloudantException("Database " + config.getDbname +
- " schema sample size is 0!")
- }
- if (limit < -1) {
- throw new CloudantException("Database " + config.getDbname +
- " schema sample size is " + limit + "!")
+ if (limit == 0 || limit < -1) {
+ throw new CloudantException("Schema size '" + limit + "' is not valid.")
}
var r = this.getQueryResult[Seq[String]](config.getUrl(limit), processAll)
if (r.isEmpty) {
@@ -58,35 +54,30 @@ class JsonStoreDataAccess (config: CloudantConfig) {
}
}
- def getAll[T](url: String)
- (implicit columns: Array[String] = null): Seq[String] = {
- this.getQueryResult[Seq[String]](url, processAll)
- }
-
def getIterator(skip: Int, limit: Int, url: String)
(implicit columns: Array[String] = null,
postData: String = null): Iterator[String] = {
+ logger.info(s"Loading data from Cloudant using: $url , postData: $postData")
val newUrl = config.getSubSetUrl(url, skip, limit, postData != null)
this.getQueryResult[Iterator[String]](newUrl, processIterator)
}
def getTotalRows(url: String, queryUsed: Boolean)
- (implicit postData: String = null): Int = {
- if (queryUsed) config.queryLimit // Query can not retrieve total row now.
- else {
- val totalUrl = config.getTotalUrl(url)
- this.getQueryResult[Int](totalUrl,
- { result => config.getTotalRows(Json.parse(result))})
- }
+ (implicit postData: String = null): Int = {
+ if (queryUsed) config.queryLimit // Query can not retrieve total row now.
+ else {
+ val totalUrl = config.getTotalUrl(url)
+ this.getQueryResult[Int](totalUrl,
+ { result => config.getResultCount(result)})
+ }
}
- private def processAll (result: String)
+ private def processAll(result: String)
(implicit columns: Array[String],
postData: String = null) = {
logger.debug(s"processAll:$result, columns:$columns")
- val jsonResult: JsValue = Json.parse(result)
- var rows = config.getRows(jsonResult, postData != null )
- if (config.viewName == null && postData == null) {
+ var rows = config.getRows(result, postData != null)
+ if (config.viewPath == null && postData == null) {
// filter design docs
rows = rows.filter(r => FilterDDocs.filter(r))
}
@@ -99,104 +90,43 @@ class JsonStoreDataAccess (config: CloudantConfig) {
processAll(result).iterator
}
- private def convert(rec: JsValue)(implicit columns: Array[String]): String = {
- if (columns == null) return Json.stringify(Json.toJson(rec))
- val m = new mutable.HashMap[String, JsValue]()
- for ( x <- columns) {
- val field = JsonUtil.getField(rec, x).getOrElse(JsNull)
- m.put(x, field)
+ private def convert(rec: JsonElement)(implicit columns: Array[String]): String = {
+ if (columns == null) {
+ rec.getAsJsonObject.toString
+ } else {
+ val jsonObject = new JsonObject
+ for (x <- columns) {
+ val field = JsonUtil.getField(rec, x).orNull
+ jsonObject.add(x, field)
+ }
+ val result = jsonObject.toString
+ logger.debug(s"converted: $result")
+ result
}
- val result = Json.stringify(Json.toJson(m.toMap))
- logger.debug(s"converted: $result")
- result
- }
-
-
- def getChanges(url: String, processResults: (String) => String): String = {
- getQueryResult(url, processResults)
}
private def getQueryResult[T]
- (url: String, postProcessor: (String) => T)
- (implicit columns: Array[String] = null,
- postData: String = null) : T = {
+ (url: String, postProcessor: (String) => T)
+ (implicit columns: Array[String] = null,
+ postData: String = null) : T = {
logger.info(s"Loading data from Cloudant using: $url , postData: $postData")
- val clRequest: HttpRequest = getClRequest(url, postData)
+ val clRequest: HttpConnection = config.executeRequest(url, postData)
- val clResponse: HttpResponse[String] = clRequest.execute()
- if (! clResponse.isSuccess) {
+ val clResponse: HttpConnection = clRequest.execute()
+ if (clResponse.getConnection.getResponseCode != 200) {
throw new CloudantException("Database " + config.getDbname +
- " request error: " + clResponse.body)
+ " request error: " + clResponse.responseAsString)
}
- val data = postProcessor(clResponse.body)
- logger.debug(s"got result: $data")
+ val data = postProcessor(clResponse.responseAsString)
+ logger.debug(s"got result:$data")
data
}
def createDB(): Unit = {
- val url = config.getDbUrl.toString
- val clRequest: HttpRequest = getClRequest(url, null, "PUT")
-
- val clResponse: HttpResponse[String] = clRequest.execute()
- if (! clResponse.isSuccess) {
- throw new CloudantException("Database " + config.getDbname +
- " create error: " + clResponse.body)
- } else {
- logger.warn(s"Database ${config.getDbname} was created.")
- }
+ config.getClient.createDB(config.getDbname)
}
-
- def getClRequest(url: String, postData: String = null,
- httpMethod: String = null): HttpRequest = {
- val requestTimeout = config.requestTimeout.toInt
- config.username match {
- case null =>
- if (postData != null) {
- Http(url)
- .postData(postData)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
- .header("Content-Type", "application/json")
- .header("User-Agent", "spark-cloudant")
- } else {
- if (httpMethod != null) {
- Http(url)
- .method(httpMethod)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
- .header("User-Agent", "spark-cloudant")
- } else {
- Http(url)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
- .header("User-Agent", "spark-cloudant")
- }
- }
- case _ =>
- if (postData != null) {
- Http(url)
- .postData(postData)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
- .header("Content-Type", "application/json")
- .header("User-Agent", "spark-cloudant")
- .auth(config.username, config.password)
- } else {
- if (httpMethod != null) {
- Http(url)
- .method(httpMethod)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
- .header("User-Agent", "spark-cloudant")
- .auth(config.username, config.password)
- } else {
- Http(url)
- .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
- .header("User-Agent", "spark-cloudant")
- .auth(config.username, config.password)
- }
- }
- }
- }
-
-
def saveAll(rows: List[String]): Unit = {
if (rows.isEmpty) return
val bulkSize = config.bulkSize
@@ -205,31 +135,29 @@ class JsonStoreDataAccess (config: CloudantConfig) {
logger.debug(s"total records:${rows.size}=bulkSize:$bulkSize * totalBulks:$totalBulks")
val futures = bulks.map( bulk => {
- val data = config.getBulkRows(bulk)
- val url = config.getBulkPostUrl.toString
- val clRequest: HttpRequest = getClRequest(url, data)
+ val jsonData = config.getBulkRows(bulk)
Future {
- clRequest.execute()
+ config.getDatabase.bulk(jsonData.asJava)
}
}
)
// remaining - number of requests remained to succeed
val remaining = new AtomicInteger(futures.length)
- val p = Promise[HttpResponse[String]]
+ val p = Promise[java.util.List[Response]]
futures foreach {
_ onComplete {
- case Success(clResponse: HttpResponse[String]) =>
- // find if there was error in saving at least one of docs
- val resBody: String = clResponse.body
- val isErr = (resBody contains config.getConflictErrStr) ||
- (resBody contains config.getForbiddenErrStr)
- if (!clResponse.isSuccess || isErr) {
+ case Success(clResponses) =>
+ if (clResponses contains config.getConflictErrStr) {
+ val e = new CloudantException("Save to database:" + config.getDbname +
+ " failed with reason: " + config.getConflictErrStr)
+ p.tryFailure(e)
+ } else if (clResponses contains config.getForbiddenErrStr) {
val e = new CloudantException("Save to database:" + config.getDbname +
- " failed with reason: " + clResponse.body)
+ " failed with reason: " + config.getForbiddenErrStr)
p.tryFailure(e)
} else if (remaining.decrementAndGet() == 0) {
// succeed the whole save operation if all requests success
- p.trySuccess(clResponse)
+ p.trySuccess(clResponses)
}
// if a least one save request fails - fail the whole save operation
case Failure(e) =>
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
index c4f828d..2ba480d 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
@@ -16,8 +16,8 @@
*/
package org.apache.bahir.cloudant.common
+import com.google.gson._
import org.slf4j.LoggerFactory
-import play.api.libs.json.{Json, JsValue}
import org.apache.spark.Partition
import org.apache.spark.SparkContext
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
import org.apache.bahir.cloudant.CloudantConfig
+import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
/**
* JsonStoreRDDPartition defines each partition as a subset of a query result:
@@ -33,8 +34,9 @@ import org.apache.bahir.cloudant.CloudantConfig
*/
private[cloudant] class JsonStoreRDDPartition(val url: String, val skip: Int, val limit: Int,
- val idx: Int, val config: CloudantConfig, val selector: JsValue, val fields: JsValue,
- val queryUsed: Boolean)
+ val idx: Int, val config: CloudantConfig,
+ val selector: JsonElement, val fields: JsonElement,
+ val queryUsed: Boolean)
extends Partition with Serializable{
val index: Int = idx
}
@@ -92,7 +94,9 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
}
}
- private def convertToMangoJson(f: Filter): (String, JsValue) = {
+ private def convertToMangoJson(f: Filter): (String, JsonElement) = {
+ val gson = new Gson
+ val parser = new JsonParser()
val (op, value): (String, Any) = f match {
case EqualTo(attr, v) => ("$eq", v)
case GreaterThan(attr, v) => ("$gt", v)
@@ -101,16 +105,16 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
case LessThanOrEqual(attr, v) => ("$lte", v)
case _ => (null, null)
}
- val convertedV: JsValue = {
+ val convertedV: JsonElement = {
// TODO Better handing of other types
if (value != null) {
value match {
- case s: String => Json.toJson(s)
- case l: Long => Json.toJson(l)
- case d: Double => Json.toJson(d)
- case i: Int => Json.toJson(i)
- case b: Boolean => Json.toJson(b)
- case t: java.sql.Timestamp => Json.toJson(t)
+ case s: String => parser.parse(s)
+ case l: Long => parser.parse(l.toString)
+ case d: Double => parser.parse(d.toString)
+ case i: Int => parser.parse(i.toString)
+ case b: Boolean => parser.parse(b.toString)
+ case t: java.sql.Timestamp => parser.parse(t.toString)
case a: Any => logger.debug(s"Ignore field:$name, cannot handle its datatype: $a"); null
}
} else null
@@ -118,7 +122,7 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
(op, convertedV)
}
- private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsValue] = {
+ private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsonElement] = {
filters.map(af => convertToMangoJson(af))
.filter(x => x._2 != null)
.toMap
@@ -137,10 +141,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
}
}
- val (selector, fields) : (JsValue, JsValue) = {
+ val (selector, fields) : (JsonElement, JsonElement) = {
if (!config.queryEnabled || origAttrToFilters == null) (null, null)
else {
- val selectors: Map[String, Map[String, JsValue]] =
+ val selectors: Map[String, Map[String, JsonElement]] =
origAttrToFilters.transform( (name, attrFilters) => convertAttrToMangoJson(attrFilters))
val filteredSelectors = selectors.filter((t) => t._2.nonEmpty)
@@ -149,10 +153,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
if (requiredcolumns == null || requiredcolumns.length == 0) {
null
} else {
- Json.toJson(requiredcolumns)
+ JsonConverter.toJson(requiredcolumns)
}
}
- (Json.toJson(filteredSelectors), queryColumns)
+ (JsonConverter.toJson(filteredSelectors), queryColumns)
} else (null, null)
}
}
@@ -174,7 +178,10 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
implicit val postData : String = {
if (queryUsed) {
- Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
+ val jsonSelector = new JsonObject
+ jsonSelector.addProperty("selector", selector.toString)
+ jsonSelector.addProperty("limit", 1)
+ jsonSelector.toString
} else {
null
}
@@ -191,7 +198,8 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
(0 until totalPartition).map(i => {
val skip = i * limitPerPartition
new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
- config, selector, fields, queryUsed).asInstanceOf[Partition]
+ config, selector, fields, queryUsed)
+ .asInstanceOf[Partition]
}).toArray
}
@@ -199,12 +207,15 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
Iterator[String] = {
val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
implicit val postData : String = {
+ val jsonObject = new JsonObject
+ jsonObject.add("selector", myPartition.selector)
+ jsonObject.addProperty("skip", myPartition.skip)
if (myPartition.queryUsed && myPartition.fields != null) {
- Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
- "limit" -> myPartition.limit, "skip" -> myPartition.skip))
+ jsonObject.add("fields", myPartition.fields)
+ jsonObject.toString
} else if (myPartition.queryUsed) {
- Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
- "skip" -> myPartition.skip))
+ jsonObject.addProperty("limit", myPartition.limit)
+ jsonObject.toString
} else {
null
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
index 82d9afc..eadceea 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
@@ -16,27 +16,38 @@
*/
package org.apache.bahir.cloudant.common
-import play.api.libs.json.JsValue
import scala.util.control.Breaks._
+import com.google.gson.{JsonElement, JsonParser}
+
object JsonUtil {
- def getField(row: JsValue, field: String) : Option[JsValue] = {
+ def getField(row: JsonElement, field: String) : Option[JsonElement] = {
var path = field.split('.')
var currentValue = row
- var finalValue: Option[JsValue] = None
+ var finalValue: Option[JsonElement] = None
breakable {
for (i <- path.indices) {
- val f: Option[JsValue] = (currentValue \ path(i)).toOption
- f match {
- case Some(f2) => currentValue = f2
- case None => break
- }
- if (i == path.length -1) {
- // The leaf node
- finalValue = Some(currentValue)
+ if (currentValue != null && currentValue.isJsonObject) {
+ val f: Option[JsonElement] =
+ Option(currentValue.getAsJsonObject.get(path(i)))
+ f match {
+ case Some(f2) => currentValue = f2
+ case None => break
+ }
+ if (i == path.length - 1) {
+ // The leaf node
+ finalValue = Some(currentValue)
+ }
}
}
}
finalValue
}
+
+ object JsonConverter {
+ val parser = new JsonParser
+ def toJson(value: Any): JsonElement = {
+ parser.parse(value.toString)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
index 323aab6..56671b3 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
@@ -17,8 +17,10 @@
package org.apache.bahir.cloudant.internal
import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
-import scalaj.http._
+import com.google.gson.JsonParser
+import okhttp3._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
@@ -37,45 +39,46 @@ class ChangesReceiver(config: CloudantChangesConfig)
}
private def receive(): Unit = {
- // Get normal _changes url
+ val okHttpClient: OkHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(5, TimeUnit.SECONDS)
+ .readTimeout(60, TimeUnit.SECONDS)
+ .build
val url = config.getChangesReceiverUrl.toString
- val selector: String = {
- "{\"selector\":" + config.getSelector + "}"
- }
- val clRequest: HttpRequest = config.username match {
- case null =>
- Http(url)
- .postData(selector)
- .header("Content-Type", "application/json")
- .header("User-Agent", "spark-cloudant")
- case _ =>
- Http(url)
- .postData(selector)
- .header("Content-Type", "application/json")
- .header("User-Agent", "spark-cloudant")
- .auth(config.username, config.password)
+ val builder = new Request.Builder().url(url)
+ if (config.username != null) {
+ val credential = Credentials.basic(config.username, config.password)
+ builder.header("Authorization", credential)
+ }
+ if(config.getSelector != null) {
+ val jsonType = MediaType.parse("application/json; charset=utf-8")
+ val selector = "{\"selector\":" + config.getSelector + "}"
+ val selectorBody = RequestBody.create(jsonType, selector)
+ builder.post(selectorBody)
}
- clRequest.exec((code, headers, is) => {
- if (code == 200) {
- var json = new ChangesRow()
- if (is != null) {
- val bufferedReader = new BufferedReader(new InputStreamReader(is))
- while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) {
- if (!isStopped() && json != null && !json.getDoc.has("_deleted")) {
- store(json.getDoc.toString)
- }
+ val request = builder.build
+ val response = okHttpClient.newCall(request).execute
+ val status_code = response.code
+
+ if (status_code == 200) {
+ val changesInputStream = response.body.byteStream
+ var json = new ChangesRow()
+ if (changesInputStream != null) {
+ val bufferedReader = new BufferedReader(new InputStreamReader(changesInputStream))
+ while ((json = ChangesRowScanner.readRowFromReader(bufferedReader)) != null) {
+ if (!isStopped() && json != null && !json.getDoc.has("_deleted")) {
+ store(json.getDoc.toString)
}
}
- } else {
- val status = headers.getOrElse("Status", IndexedSeq.empty)
- val errorMsg = "Error retrieving _changes feed data from database " +
- "'" + config.getDbname + "': " + status(0)
- reportError(errorMsg, new CloudantException(errorMsg))
- CloudantChangesConfig.receiverErrorMsg = errorMsg
}
- })
+ } else {
+ val responseAsJson = new JsonParser().parse(response.body.string)
+ val errorMsg = "Error retrieving _changes feed data from database " + "'" +
+ config.getDbname + "' with response code " + status_code + ": " + responseAsJson.toString
+ reportError(errorMsg, new CloudantException(errorMsg))
+ CloudantChangesConfig.receiverErrorMsg = errorMsg
+ }
}
override def onStop(): Unit = {
http://git-wip-us.apache.org/repos/asf/bahir/blob/6ea42a89/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
index 8495026..c487937 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
@@ -42,7 +42,6 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
}
assert(thrown.getMessage === s"spark.cloudant.endpoint parameter " +
s"is invalid. Please supply the valid option '_all_docs' or '_changes'.")
-
}
testIfEnabled("empty username option throws an error message") {
@@ -99,7 +98,35 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
val thrown = intercept[CloudantException] {
spark.read.format("org.apache.bahir.cloudant").load("n_flight")
}
- assert(thrown.getMessage === s"Error retrieving _changes feed data" +
- s" from database 'n_flight': HTTP/1.1 401 Unauthorized")
+ assert(thrown.getMessage === "Error retrieving _changes feed data" +
+ " from database 'n_flight' with response code 401: {\"error\":\"unauthorized\"," +
+ "\"reason\":\"Name or password is incorrect.\"}")
+ }
+
+ testIfEnabled("string with valid value for cloudant.numberOfRetries option") {
+ spark = SparkSession.builder().config(conf)
+ .config("cloudant.host", TestUtils.getHost)
+ .config("cloudant.username", TestUtils.getUsername)
+ .config("cloudant.password", TestUtils.getPassword)
+ .config("cloudant.numberOfRetries", "5")
+ .getOrCreate()
+
+ val df = spark.read.format("org.apache.bahir.cloudant").load("n_booking")
+ assert(df.count() === 2)
+ }
+
+ testIfEnabled("invalid value for cloudant.numberOfRetries option throws an error message") {
+ spark = SparkSession.builder().config(conf)
+ .config("cloudant.host", TestUtils.getHost)
+ .config("cloudant.username", TestUtils.getUsername)
+ .config("cloudant.password", TestUtils.getPassword)
+ .config("cloudant.numberOfRetries", "five")
+ .getOrCreate()
+
+ val thrown = intercept[CloudantException] {
+ spark.read.format("org.apache.bahir.cloudant").load("db")
+ }
+ assert(thrown.getMessage === s"Option \'cloudant.numberOfRetries\' failed with exception " +
+ s"""java.lang.NumberFormatException: For input string: "five"""")
}
}