You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by emlaver <gi...@git.apache.org> on 2018/01/03 20:08:51 UTC

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

GitHub user emlaver opened a pull request:

    https://github.com/apache/bahir/pull/61

    [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cloudant features

    _What_ 
    Refactor sql-cloudant to use Cloudant's` java-cloudant` features
    
    _How_
    - Use java-cloudant’s executeRequest for HTTP requests against _all_docs endpoint
    - Added HTTP 429 backoff with default settings
    - Simplified caught exception and message for schema size
    - Replaced scala http library with okhttp library for changes receiver
    - Updated streaming CloudantReceiver class to use improved ChangesRowScanner method
    - Replaced Play JSON with GSON library
    - Updated save operation to use `java-cloudant` bulk API
    - Use `_changes` feed filter option for Cloudant/CouchDB 2.x and greater

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/emlaver/bahir 154-java-cloudant-refactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir/pull/61.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #61
    
----
commit 72bdc4d960fc85bf22db8fbd655b12346118998e
Author: Esteban Laver <em...@...>
Date:   2017-12-17T18:30:04Z

    Initial commit for java-cloudant rebase
    - Use java-cloudant’s executeRequest for HTTP requests against _all_docs endpoint
    - Added HTTP 429 backoff with default settings
    - Use java-cloudant view builders to get total doc limit
    - Simplified caught exception and message for schema size

commit eb4053fe013dec1f3538224dee692118200ed19c
Author: Esteban Laver <em...@...>
Date:   2017-12-09T04:55:01Z

    Replaced scala http library with okhttp library for changes receiver

commit 5f20a3fd8d7e26a0c8c6ef9200d4386eeec31036
Author: Esteban Laver <em...@...>
Date:   2017-12-17T17:54:47Z

    Updated streaming CloudantReceiver class to use improved ChangesRowScanner method

commit 1dc43cc89bedfb241c41acd5961cf13d741fab86
Author: Esteban Laver <em...@...>
Date:   2017-12-17T18:37:14Z

    Replaced all references of play json with GSON library
    - Updated save operation to use java-cloudant’s bulk API

commit 6520385e45634d23650c72a0431fe14d55762104
Author: Esteban Laver <em...@...>
Date:   2017-12-19T17:02:26Z

    Only use _changes feed filter option for Cloudant/CouchDB 2.x and greater

----


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077401
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
        (0 until totalPartition).map(i => {
           val skip = i * limitPerPartition
           new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
    -          config, selector, fields, queryUsed).asInstanceOf[Partition]
    +          config, selector, fields, queryUsed)
    +        .asInstanceOf[Partition]
         }).toArray
       }
     
       override def compute(splitIn: Partition, context: TaskContext):
           Iterator[String] = {
         val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
         implicit val postData : String = {
    +      val jsonObject = new JsonObject
           if (myPartition.queryUsed && myPartition.fields != null) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
    -            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" ->
    +        // myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        jsonObject.add("selector", myPartition.selector)
    +        jsonObject.add("fields", myPartition.fields)
    +        jsonObject.addProperty("skip", myPartition.skip)
    +        jsonObject.toString
           } else if (myPartition.queryUsed) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
    -            "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector,
    +        // "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159889840
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
        (0 until totalPartition).map(i => {
           val skip = i * limitPerPartition
           new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
    -          config, selector, fields, queryUsed).asInstanceOf[Partition]
    +          config, selector, fields, queryUsed)
    +        .asInstanceOf[Partition]
         }).toArray
       }
     
       override def compute(splitIn: Partition, context: TaskContext):
           Iterator[String] = {
         val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
         implicit val postData : String = {
    +      val jsonObject = new JsonObject
           if (myPartition.queryUsed && myPartition.fields != null) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
    -            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" ->
    +        // myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        jsonObject.add("selector", myPartition.selector)
    +        jsonObject.add("fields", myPartition.fields)
    +        jsonObject.addProperty("skip", myPartition.skip)
    +        jsonObject.toString
           } else if (myPartition.queryUsed) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
    -            "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector,
    +        // "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    --- End diff --
    
    remove


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077412
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -101,24 +105,24 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
           case LessThanOrEqual(attr, v) => ("$lte", v)
           case _ => (null, null)
         }
    -    val convertedV: JsValue = {
    +    val convertedV: JsonElement = {
           // TODO Better handing of other types
           if (value != null) {
             value match {
    -          case s: String => Json.toJson(s)
    -          case l: Long => Json.toJson(l)
    -          case d: Double => Json.toJson(d)
    -          case i: Int => Json.toJson(i)
    -          case b: Boolean => Json.toJson(b)
    -          case t: java.sql.Timestamp => Json.toJson(t)
    +          case s: String => parser.parse(s)// Json.toJson(s)
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    Can one of the admins verify this patch?


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    ok to test


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160470451
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    --- End diff --
    
    Also the defaults may be insufficient for some use cases. It may be worth exposing config option(s) for at least the number of retries if not the backoff.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159699042
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    --- End diff --
    
    I think you could leverage scala's raw interpreter here to make the regex a bit less escape-y:
    `raw"\w+\/\w+\/\w+\/\w+"`
    
    Also are word characters sufficient here? Aren't there some other characters that could be part of a design document id and allowed unencoded in a URL path (e.g. maybe `@`?) that would be excluded by this regex? I'd be inclined to maybe use something like: `_design\/(^\/)+\/_search\/(^\/)+`


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159845874
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    +      // return 'design-doc/search-name'
    +      splitPath(1) + File.separator + splitPath(3)
    +    } else {
    +      null
    +    }
    +  }
    +  lazy val viewName: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      val splitViewPath = viewPath.split(File.separator)
    +      if(splitViewPath(3).contains("?")) {
    +        splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
    +      } else {
    +        splitViewPath(3)
    +      }
    +    } else {
    +      null
    +    }
    +  }
     
       val pkField = "_id"
       val defaultIndex: String = endpoint
       val default_filter: String = "*:*"
     
    -  def getDbUrl: String = {
    -    dbUrl
    +  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = {
    +    var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      allDocsReq = allDocsReq.limit(limit)
    +    }
    +    allDocsReq
    +  }
    +
    +  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
    +  UnpaginatedRequestBuilder[String, String] = {
    +    val viewReq = database.getViewRequestBuilder(designDoc, viewName)
    +      .newRequest(Key.Type.STRING, classOf[String])
    +      .includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      viewReq.limit(limit)
    +    }
    +    viewReq
    +  }
    +
    +  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
    +    val searchReq = database.search(searchName)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      searchReq.limit(limit)
    +    }
    +    searchReq.querySearchResult(default_filter, classOf[JsonObject])
    +  }
    +
    +  def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
    +    val url = new URL(stringUrl)
    +    if(postData != null) {
    +      val conn = Http.POST(url, "application/json")
    +      conn.setRequestBody(postData)
    +      conn.requestProperties.put("Accept", "application/json")
    +      conn.requestProperties.put("User-Agent", "spark-cloudant")
    +      client.executeRequest(conn)
    +    } else {
    +      val conn = Http.GET(url)
    +      conn.requestProperties.put("Accept", "application/json")
    +      conn.requestProperties.put("User-Agent", "spark-cloudant")
    --- End diff --
    
    As above


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159845032
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    --- End diff --
    
    It may be worth adding an additional interceptor here to change/augment the UA.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159845718
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    +      // return 'design-doc/search-name'
    +      splitPath(1) + File.separator + splitPath(3)
    +    } else {
    +      null
    +    }
    +  }
    +  lazy val viewName: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      val splitViewPath = viewPath.split(File.separator)
    +      if(splitViewPath(3).contains("?")) {
    +        splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
    +      } else {
    +        splitViewPath(3)
    +      }
    +    } else {
    +      null
    +    }
    +  }
     
       val pkField = "_id"
       val defaultIndex: String = endpoint
       val default_filter: String = "*:*"
     
    -  def getDbUrl: String = {
    -    dbUrl
    +  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = {
    +    var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      allDocsReq = allDocsReq.limit(limit)
    +    }
    +    allDocsReq
    +  }
    +
    +  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
    +  UnpaginatedRequestBuilder[String, String] = {
    +    val viewReq = database.getViewRequestBuilder(designDoc, viewName)
    +      .newRequest(Key.Type.STRING, classOf[String])
    +      .includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      viewReq.limit(limit)
    +    }
    +    viewReq
    +  }
    +
    +  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
    +    val searchReq = database.search(searchName)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      searchReq.limit(limit)
    +    }
    +    searchReq.querySearchResult(default_filter, classOf[JsonObject])
    +  }
    +
    +  def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
    +    val url = new URL(stringUrl)
    +    if(postData != null) {
    +      val conn = Http.POST(url, "application/json")
    +      conn.setRequestBody(postData)
    +      conn.requestProperties.put("Accept", "application/json")
    --- End diff --
    
    Is this necessary? CouchDB will default to sending back JSON anyway won't it?


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077429
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    +      // return 'design-doc/search-name'
    +      splitPath(1) + File.separator + splitPath(3)
    +    } else {
    +      null
    +    }
    +  }
    +  lazy val viewName: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      val splitViewPath = viewPath.split(File.separator)
    +      if(splitViewPath(3).contains("?")) {
    +        splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
    +      } else {
    +        splitViewPath(3)
    +      }
    +    } else {
    +      null
    +    }
    +  }
     
       val pkField = "_id"
       val defaultIndex: String = endpoint
       val default_filter: String = "*:*"
     
    -  def getDbUrl: String = {
    -    dbUrl
    +  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = {
    +    var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      allDocsReq = allDocsReq.limit(limit)
    +    }
    +    allDocsReq
    +  }
    +
    +  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
    +  UnpaginatedRequestBuilder[String, String] = {
    +    val viewReq = database.getViewRequestBuilder(designDoc, viewName)
    +      .newRequest(Key.Type.STRING, classOf[String])
    +      .includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      viewReq.limit(limit)
    +    }
    +    viewReq
    +  }
    +
    +  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
    +    val searchReq = database.search(searchName)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      searchReq.limit(limit)
    +    }
    +    searchReq.querySearchResult(default_filter, classOf[JsonObject])
    +  }
    +
    +  def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
    +    val url = new URL(stringUrl)
    +    if(postData != null) {
    +      val conn = Http.POST(url, "application/json")
    +      conn.setRequestBody(postData)
    +      conn.requestProperties.put("Accept", "application/json")
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    :red_circle: Build failed, see build log for details
     



---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159889644
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -174,7 +178,11 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
     
         implicit val postData : String = {
           if (queryUsed) {
    -        Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
    +        val jsonSelector = new JsonObject
    +        jsonSelector.addProperty("selector", selector.toString)
    +        jsonSelector.addProperty("limit", 1)
    +        jsonSelector.toString
    +        // Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
    --- End diff --
    
    remove


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159845855
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    +      // return 'design-doc/search-name'
    +      splitPath(1) + File.separator + splitPath(3)
    +    } else {
    +      null
    +    }
    +  }
    +  lazy val viewName: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      val splitViewPath = viewPath.split(File.separator)
    +      if(splitViewPath(3).contains("?")) {
    +        splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
    +      } else {
    +        splitViewPath(3)
    +      }
    +    } else {
    +      null
    +    }
    +  }
     
       val pkField = "_id"
       val defaultIndex: String = endpoint
       val default_filter: String = "*:*"
     
    -  def getDbUrl: String = {
    -    dbUrl
    +  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = {
    +    var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      allDocsReq = allDocsReq.limit(limit)
    +    }
    +    allDocsReq
    +  }
    +
    +  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
    +  UnpaginatedRequestBuilder[String, String] = {
    +    val viewReq = database.getViewRequestBuilder(designDoc, viewName)
    +      .newRequest(Key.Type.STRING, classOf[String])
    +      .includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      viewReq.limit(limit)
    +    }
    +    viewReq
    +  }
    +
    +  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
    +    val searchReq = database.search(searchName)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      searchReq.limit(limit)
    +    }
    +    searchReq.querySearchResult(default_filter, classOf[JsonObject])
    +  }
    +
    +  def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
    +    val url = new URL(stringUrl)
    +    if(postData != null) {
    +      val conn = Http.POST(url, "application/json")
    +      conn.setRequestBody(postData)
    +      conn.requestProperties.put("Accept", "application/json")
    +      conn.requestProperties.put("User-Agent", "spark-cloudant")
    +      client.executeRequest(conn)
    +    } else {
    +      val conn = Http.GET(url)
    +      conn.requestProperties.put("Accept", "application/json")
    --- End diff --
    
    As above


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    @lresende PR rebased and fixed error message after rebasing #60.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159890761
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
        (0 until totalPartition).map(i => {
           val skip = i * limitPerPartition
           new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
    -          config, selector, fields, queryUsed).asInstanceOf[Partition]
    +          config, selector, fields, queryUsed)
    +        .asInstanceOf[Partition]
         }).toArray
       }
     
       override def compute(splitIn: Partition, context: TaskContext):
           Iterator[String] = {
         val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
         implicit val postData : String = {
    +      val jsonObject = new JsonObject
           if (myPartition.queryUsed && myPartition.fields != null) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
    -            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" ->
    +        // myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        jsonObject.add("selector", myPartition.selector)
    +        jsonObject.add("fields", myPartition.fields)
    +        jsonObject.addProperty("skip", myPartition.skip)
    +        jsonObject.toString
           } else if (myPartition.queryUsed) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
    -            "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector,
    +        // "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        jsonObject.add("selector", myPartition.selector)
    --- End diff --
    
    Could add the `selector` and `skip` before the `if`/`else` and avoid the duplication


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159814009
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    --- End diff --
    
    This is a great idea and I'll use this for both view and search.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r161270944
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala ---
    @@ -58,35 +54,30 @@ class JsonStoreDataAccess (config: CloudantConfig)  {
         }
       }
     
    -  def getAll[T](url: String)
    -      (implicit columns: Array[String] = null): Seq[String] = {
    -    this.getQueryResult[Seq[String]](url, processAll)
    -  }
    -
       def getIterator(skip: Int, limit: Int, url: String)
           (implicit columns: Array[String] = null,
           postData: String = null): Iterator[String] = {
    +    logger.info(s"Loading data from Cloudant using: $url , postData: $postData")
         val newUrl = config.getSubSetUrl(url, skip, limit, postData != null)
         this.getQueryResult[Iterator[String]](newUrl, processIterator)
       }
     
       def getTotalRows(url: String, queryUsed: Boolean)
    -      (implicit postData: String = null): Int = {
    -      if (queryUsed) config.queryLimit // Query can not retrieve total row now.
    -      else {
    -        val totalUrl = config.getTotalUrl(url)
    -        this.getQueryResult[Int](totalUrl,
    -          { result => config.getTotalRows(Json.parse(result))})
    -      }
    +                  (implicit postData: String = null): Int = {
    --- End diff --
    
    is this a bit too indented?


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159848274
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String,
         }
       }
     
    +  def getTotalDocCount: Int = {
    +    val limit = 1
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    +      buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +    } else {
    +      // /_all_docs?limit=1
    +      // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method
    +      // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +      val response = client.executeRequest(Http.GET(
    +        new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit)))
    +      getResultTotalRows(response.responseAsString)
    +    }
    +  }
    +
    +  def getDocs(limit: Int): List[JsonObject] = {
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    --- End diff --
    
    remove?


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159845769
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    +      // return 'design-doc/search-name'
    +      splitPath(1) + File.separator + splitPath(3)
    +    } else {
    +      null
    +    }
    +  }
    +  lazy val viewName: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      val splitViewPath = viewPath.split(File.separator)
    +      if(splitViewPath(3).contains("?")) {
    +        splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
    +      } else {
    +        splitViewPath(3)
    +      }
    +    } else {
    +      null
    +    }
    +  }
     
       val pkField = "_id"
       val defaultIndex: String = endpoint
       val default_filter: String = "*:*"
     
    -  def getDbUrl: String = {
    -    dbUrl
    +  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = {
    +    var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      allDocsReq = allDocsReq.limit(limit)
    +    }
    +    allDocsReq
    +  }
    +
    +  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
    +  UnpaginatedRequestBuilder[String, String] = {
    +    val viewReq = database.getViewRequestBuilder(designDoc, viewName)
    +      .newRequest(Key.Type.STRING, classOf[String])
    +      .includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      viewReq.limit(limit)
    +    }
    +    viewReq
    +  }
    +
    +  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
    +    val searchReq = database.search(searchName)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      searchReq.limit(limit)
    +    }
    +    searchReq.querySearchResult(default_filter, classOf[JsonObject])
    +  }
    +
    +  def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
    +    val url = new URL(stringUrl)
    +    if(postData != null) {
    +      val conn = Http.POST(url, "application/json")
    +      conn.setRequestBody(postData)
    +      conn.requestProperties.put("Accept", "application/json")
    +      conn.requestProperties.put("User-Agent", "spark-cloudant")
    --- End diff --
    
    If you add the interceptor as per earlier comment, this won't be needed.


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    
    Refer to this link for build results (access rights to CI server needed): 
    http://169.45.79.58:8080/job/bahir_spark_pr_builder/149/



---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077433
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String,
         }
       }
     
    +  def getTotalDocCount: Int = {
    +    val limit = 1
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    +      buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +    } else {
    +      // /_all_docs?limit=1
    +      // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method
    +      // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +      val response = client.executeRequest(Http.GET(
    +        new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit)))
    +      getResultTotalRows(response.responseAsString)
    +    }
    +  }
    +
    +  def getDocs(limit: Int): List[JsonObject] = {
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    +      buildViewRequest(limit).build().getResponse.getDocsAs(classOf[JsonObject]).asScala.toList
    +    } else if (indexPath != null) {
    +      var searchDocs = mutable.ListBuffer[JsonObject]()
    +      for (result: SearchResult[JsonObject]#SearchResultRow <-
    +           buildSearchRequest(limit).getRows.asScala) {
    +        searchDocs += result.getDoc
    +      }
    +      searchDocs.toList
    +    } else {
    +      // /_all_docs?limit=1
    +      // val response = client.executeRequest(Http.GET(
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159699798
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    --- End diff --
    
    If you separate out the regex pattern from earlier you could use the captured groups to extract the design doc ID and search index name without needing to do more splits here.


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    :white_check_mark: Build successful
     



---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159709382
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String,
         }
       }
     
    +  def getTotalDocCount: Int = {
    +    val limit = 1
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    +      buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +    } else {
    +      // /_all_docs?limit=1
    +      // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method
    +      // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +      val response = client.executeRequest(Http.GET(
    +        new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit)))
    +      getResultTotalRows(response.responseAsString)
    --- End diff --
    
    It might be easier to use
    `com.cloudant.client.api.model.DbInfo#getDocCount` instead of trying to do it via `_all_docs`


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077466
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    --- End diff --
    
    Improved regex pattern in in 55cc844.


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    :white_check_mark: Build successful
     



---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    
    Refer to this link for build results (access rights to CI server needed): 
    http://169.45.79.58:8080/job/bahir_spark_pr_builder/145/



---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    @emlaver Could you please rebase this pr


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077380
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
        (0 until totalPartition).map(i => {
           val skip = i * limitPerPartition
           new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
    -          config, selector, fields, queryUsed).asInstanceOf[Partition]
    +          config, selector, fields, queryUsed)
    +        .asInstanceOf[Partition]
         }).toArray
       }
     
       override def compute(splitIn: Partition, context: TaskContext):
           Iterator[String] = {
         val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
         implicit val postData : String = {
    +      val jsonObject = new JsonObject
           if (myPartition.queryUsed && myPartition.fields != null) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
    -            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" ->
    +        // myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        jsonObject.add("selector", myPartition.selector)
    +        jsonObject.add("fields", myPartition.fields)
    +        jsonObject.addProperty("skip", myPartition.skip)
    +        jsonObject.toString
           } else if (myPartition.queryUsed) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
    -            "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector,
    +        // "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        jsonObject.add("selector", myPartition.selector)
    --- End diff --
    
    Fixed in 55cc844.


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    
    Refer to this link for build results (access rights to CI server needed): 
    http://169.45.79.58:8080/job/bahir_spark_pr_builder/156/



---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077413
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String,
         }
       }
     
    +  def getTotalDocCount: Int = {
    +    val limit = 1
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    +      buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +    } else {
    +      // /_all_docs?limit=1
    +      // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method
    +      // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +      val response = client.executeRequest(Http.GET(
    +        new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit)))
    +      getResultTotalRows(response.responseAsString)
    +    }
    +  }
    +
    +  def getDocs(limit: Int): List[JsonObject] = {
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077474
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    --- End diff --
    
    Improved regex pattern in in 55cc844.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159889787
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
        (0 until totalPartition).map(i => {
           val skip = i * limitPerPartition
           new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
    -          config, selector, fields, queryUsed).asInstanceOf[Partition]
    +          config, selector, fields, queryUsed)
    +        .asInstanceOf[Partition]
         }).toArray
       }
     
       override def compute(splitIn: Partition, context: TaskContext):
           Iterator[String] = {
         val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
         implicit val postData : String = {
    +      val jsonObject = new JsonObject
           if (myPartition.queryUsed && myPartition.fields != null) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
    -            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" ->
    +        // myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    --- End diff --
    
    remove


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    :white_check_mark: Build successful
     



---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    
    Refer to this link for build results (access rights to CI server needed): 
    http://169.45.79.58:8080/job/bahir_spark_pr_builder/148/



---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    
    Refer to this link for build results (access rights to CI server needed): 
    http://169.45.79.58:8080/job/bahir_spark_pr_builder/147/



---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    :white_check_mark: Build successful
     



---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077409
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -174,7 +178,11 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
     
         implicit val postData : String = {
           if (queryUsed) {
    -        Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
    +        val jsonSelector = new JsonObject
    +        jsonSelector.addProperty("selector", selector.toString)
    +        jsonSelector.addProperty("limit", 1)
    +        jsonSelector.toString
    +        // Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077423
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    +    .build
    +  @transient private lazy val database: Database = client.database(dbName, false)
       lazy val dbUrl: String = {protocol + "://" + host + "/" + dbName}
    +  lazy val designDoc: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      viewPath.split("/")(1)
    +    } else {
    +    null
    +    }
    +  }
    +  lazy val searchName: String = {
    +    // verify that the index path matches '_design/ddoc/_search/searchname'
    +    if (indexPath != null && indexPath.nonEmpty && indexPath.matches("\\w+\\/\\w+\\/\\w+\\/\\w+")) {
    +      val splitPath = indexPath.split(File.separator)
    +      // return 'design-doc/search-name'
    +      splitPath(1) + File.separator + splitPath(3)
    +    } else {
    +      null
    +    }
    +  }
    +  lazy val viewName: String = {
    +    if (viewPath != null && viewPath.nonEmpty) {
    +      val splitViewPath = viewPath.split(File.separator)
    +      if(splitViewPath(3).contains("?")) {
    +        splitViewPath(3).substring(0, splitViewPath(3).indexOf("?"))
    +      } else {
    +        splitViewPath(3)
    +      }
    +    } else {
    +      null
    +    }
    +  }
     
       val pkField = "_id"
       val defaultIndex: String = endpoint
       val default_filter: String = "*:*"
     
    -  def getDbUrl: String = {
    -    dbUrl
    +  def buildAllDocsRequest(limit: Int, includeDocs: Boolean = true): AllDocsRequestBuilder = {
    +    var allDocsReq = database.getAllDocsRequestBuilder.includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      allDocsReq = allDocsReq.limit(limit)
    +    }
    +    allDocsReq
    +  }
    +
    +  def buildViewRequest(limit: Int, includeDocs: Boolean = true):
    +  UnpaginatedRequestBuilder[String, String] = {
    +    val viewReq = database.getViewRequestBuilder(designDoc, viewName)
    +      .newRequest(Key.Type.STRING, classOf[String])
    +      .includeDocs(includeDocs)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      viewReq.limit(limit)
    +    }
    +    viewReq
    +  }
    +
    +  def buildSearchRequest(limit: Int): SearchResult[JsonObject] = {
    +    val searchReq = database.search(searchName)
    +    if (limit != JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT) {
    +      searchReq.limit(limit)
    +    }
    +    searchReq.querySearchResult(default_filter, classOf[JsonObject])
    +  }
    +
    +  def executeRequest(stringUrl: String, postData: String = null): HttpConnection = {
    +    val url = new URL(stringUrl)
    +    if(postData != null) {
    +      val conn = Http.POST(url, "application/json")
    +      conn.setRequestBody(postData)
    +      conn.requestProperties.put("Accept", "application/json")
    +      conn.requestProperties.put("User-Agent", "spark-cloudant")
    +      client.executeRequest(conn)
    +    } else {
    +      val conn = Http.GET(url)
    +      conn.requestProperties.put("Accept", "application/json")
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159889542
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -101,24 +105,24 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
           case LessThanOrEqual(attr, v) => ("$lte", v)
           case _ => (null, null)
         }
    -    val convertedV: JsValue = {
    +    val convertedV: JsonElement = {
           // TODO Better handing of other types
           if (value != null) {
             value match {
    -          case s: String => Json.toJson(s)
    -          case l: Long => Json.toJson(l)
    -          case d: Double => Json.toJson(d)
    -          case i: Int => Json.toJson(i)
    -          case b: Boolean => Json.toJson(b)
    -          case t: java.sql.Timestamp => Json.toJson(t)
    +          case s: String => parser.parse(s)// Json.toJson(s)
    --- End diff --
    
    some old code here that should probably be removed (and on the following line)


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    :white_check_mark: Build successful
     



---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159847773
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -54,20 +147,21 @@ class CloudantConfig(val protocol: String, val host: String,
         createDBOnSave
       }
     
    -  def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
    +  def getClientUrl: URL = {
    +    new URL(protocol + "://" + host)
    +  }
    +
    +  def getLastNum(result: JsonObject): JsonObject = result.get("last_seq").getAsJsonObject
     
       /* Url containing limit for docs in a Cloudant database.
       * If a view is not defined, use the _all_docs endpoint.
       * @return url with one doc limit for retrieving total doc count
       */
       def getUrl(limit: Int, excludeDDoc: Boolean = false): String = {
    --- End diff --
    
    The comment suggests that this method is used to get a URL with a `limit=1` for the purposes of getting a doc count, but then it is not used by `getTotalDocCount` it does however appear to be used by `getMany` with a different limit for query results.
    FWIW I can't see `getTotalDocCount` being called anywhere in the new code either.


---

[GitHub] bahir issue #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's java-cl...

Posted by ApacheBahir <gi...@git.apache.org>.
Github user ApacheBahir commented on the issue:

    https://github.com/apache/bahir/pull/61
  
    
    Refer to this link for build results (access rights to CI server needed): 
    http://169.45.79.58:8080/job/bahir_spark_pr_builder/157/



---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r160077403
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala ---
    @@ -191,20 +199,30 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
        (0 until totalPartition).map(i => {
           val skip = i * limitPerPartition
           new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
    -          config, selector, fields, queryUsed).asInstanceOf[Partition]
    +          config, selector, fields, queryUsed)
    +        .asInstanceOf[Partition]
         }).toArray
       }
     
       override def compute(splitIn: Partition, context: TaskContext):
           Iterator[String] = {
         val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
         implicit val postData : String = {
    +      val jsonObject = new JsonObject
           if (myPartition.queryUsed && myPartition.fields != null) {
    -        Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
    -            "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    +        // Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" ->
    +        // myPartition.fields, "limit" -> myPartition.limit, "skip" -> myPartition.skip))
    --- End diff --
    
    Removed in 55cc844.


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by ricellis <gi...@git.apache.org>.
Github user ricellis commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r159709935
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -95,15 +189,49 @@ class CloudantConfig(val protocol: String, val host: String,
         }
       }
     
    +  def getTotalDocCount: Int = {
    +    val limit = 1
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    +      buildViewRequest(limit, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +    } else {
    +      // /_all_docs?limit=1
    +      // Note: java-cloudant's AllDocsRequest doesn't have a getTotalRowCount method
    +      // buildAllDocsRequest(1, includeDocs = false).build().getResponse.getTotalRowCount.toInt
    +      val response = client.executeRequest(Http.GET(
    +        new URL(database.getDBUri + File.separator + endpoint + "?limit=" + limit)))
    +      getResultTotalRows(response.responseAsString)
    +    }
    +  }
    +
    +  def getDocs(limit: Int): List[JsonObject] = {
    +    if (viewPath != null) {
    +      // "limit=" + limit + "&skip=" + skip
    +      buildViewRequest(limit).build().getResponse.getDocsAs(classOf[JsonObject]).asScala.toList
    +    } else if (indexPath != null) {
    +      var searchDocs = mutable.ListBuffer[JsonObject]()
    +      for (result: SearchResult[JsonObject]#SearchResultRow <-
    +           buildSearchRequest(limit).getRows.asScala) {
    +        searchDocs += result.getDoc
    +      }
    +      searchDocs.toList
    +    } else {
    +      // /_all_docs?limit=1
    +      // val response = client.executeRequest(Http.GET(
    --- End diff --
    
    Remove commented out code?


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir/pull/61


---

[GitHub] bahir pull request #61: [BAHIR-154] Refactor sql-cloudant to use Cloudant's ...

Posted by emlaver <gi...@git.apache.org>.
Github user emlaver commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/61#discussion_r161142828
  
    --- Diff: sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala ---
    @@ -16,34 +16,127 @@
      */
     package org.apache.bahir.cloudant
     
    -import java.net.URLEncoder
    +import java.net.{URL, URLEncoder}
     
    -import play.api.libs.json.{JsArray, JsObject, Json, JsValue}
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.reflect.io.File
    +
    +import com.cloudant.client.api.{ClientBuilder, CloudantClient, Database}
    +import com.cloudant.client.api.model.SearchResult
    +import com.cloudant.client.api.views._
    +import com.cloudant.http.{Http, HttpConnection}
    +import com.cloudant.http.interceptors.Replay429Interceptor
    +import com.google.gson.{JsonObject, JsonParser}
     
     import org.apache.bahir.cloudant.common._
    +import org.apache.bahir.cloudant.common.JsonUtil.JsonConverter
     
     /*
     * Only allow one field pushdown now
     * as the filter today does not tell how to link the filters out And v.s. Or
     */
     
     class CloudantConfig(val protocol: String, val host: String,
    -                     val dbName: String, val indexName: String, val viewName: String)
    +                     val dbName: String, val indexPath: String, val viewPath: String)
                         (implicit val username: String, val password: String,
                          val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
                          val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
                          val createDBOnSave: Boolean, val endpoint: String,
                          val useQuery: Boolean = false, val queryLimit: Int)
       extends Serializable {
     
    +  @transient private lazy val client: CloudantClient = ClientBuilder
    +    .url(getClientUrl)
    +    .username(username)
    +    .password(password)
    +    .interceptors(Replay429Interceptor.WITH_DEFAULTS)
    --- End diff --
    
    Added option in 9fefcdc.


---