You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:15 UTC
[29/50] [abbrv] incubator-livy-website git commit: [BAHIR-101] Spark
SQL datasource for CounchDB/Cloudant
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
new file mode 100644
index 0000000..e84a44c
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.common
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.HashMap
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.util.{Failure, Success}
+
+import scalaj.http.{Http, HttpRequest, HttpResponse}
+import ExecutionContext.Implicits.global
+import org.slf4j.LoggerFactory
+import play.api.libs.json._
+
+import org.apache.spark.sql.sources._
+
+import org.apache.bahir.cloudant.CloudantConfig
+import org.apache.bahir.cloudant.common._
+
+
+class JsonStoreDataAccess (config: CloudantConfig) {
+ lazy val logger = LoggerFactory.getLogger(getClass)
+ implicit lazy val timeout = config.requestTimeout
+
+ def getOne()( implicit columns: Array[String] = null): Seq[String] = {
+ var r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc1(), processAll)
+ if (r.size == 0 ) {
+ r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc2(), processAll)
+ }
+ if (r.size == 0) {
+ throw new RuntimeException("Database " + config.getDbname() +
+ " doesn't have any non-design documents!")
+ } else {
+ r
+ }
+ }
+
+ def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = {
+ if (limit == 0) {
+ throw new RuntimeException("Database " + config.getDbname() +
+ " schema sample size is 0!")
+ }
+ if (limit < -1) {
+ throw new RuntimeException("Database " + config.getDbname() +
+ " schema sample size is " + limit + "!")
+ }
+ var r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit), processAll)
+ if (r.size == 0) {
+ r = this.getQueryResult[Seq[String]](config.getAllDocsUrlExcludeDDoc(limit), processAll)
+ }
+ if (r.size == 0) {
+ throw new RuntimeException("Database " + config.getDbname() +
+ " doesn't have any non-design documents!")
+ } else {
+ r
+ }
+ }
+
+ def getAll[T](url: String)
+ (implicit columns: Array[String] = null,
+ attrToFilters: Map[String, Array[Filter]] = null): Seq[String] = {
+ this.getQueryResult[Seq[String]](url, processAll)
+ }
+
+ def getIterator(skip: Int, limit: Int, url: String)
+ (implicit columns: Array[String] = null,
+ attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = {
+ implicit def convertSkip(skip: Int): String = {
+ val url = config.getLastUrl(skip)
+ if (url == null) {
+ skip.toString()
+ } else {
+ this.getQueryResult[String](url,
+ { result => config.getLastNum(Json.parse(result)).as[JsString].value})
+ }
+ }
+ val newUrl = config.getSubSetUrl(url, skip, limit)
+ this.getQueryResult[Iterator[String]](newUrl, processIterator)
+ }
+
+ def getTotalRows(url: String): Int = {
+ val totalUrl = config.getTotalUrl(url)
+ this.getQueryResult[Int](totalUrl,
+ { result => config.getTotalRows(Json.parse(result))})
+ }
+
+ private def processAll (result: String)
+ (implicit columns: Array[String],
+ attrToFilters: Map[String, Array[Filter]] = null) = {
+ logger.debug(s"processAll columns:$columns, attrToFilters:$attrToFilters")
+ val jsonResult: JsValue = Json.parse(result)
+ var rows = config.getRows(jsonResult)
+ if (config.viewName == null) {
+ // filter design docs
+ rows = rows.filter(r => FilterDDocs.filter(r))
+ }
+ rows.map(r => convert(r))
+ }
+
+ private def processIterator (result: String)
+ (implicit columns: Array[String],
+ attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = {
+ processAll(result).iterator
+ }
+
+ private def convert(rec: JsValue)(implicit columns: Array[String]): String = {
+ if (columns == null) return Json.stringify(Json.toJson(rec))
+ val m = new HashMap[String, JsValue]()
+ for ( x <- columns) {
+ val field = JsonUtil.getField(rec, x).getOrElse(JsNull)
+ m.put(x, field)
+ }
+ val result = Json.stringify(Json.toJson(m.toMap))
+ logger.debug(s"converted: $result")
+ result
+ }
+
+
+ def getChanges(url: String, processResults: (String) => String): String = {
+ getQueryResult(url, processResults)
+ }
+
+
+ private def getQueryResult[T]
+ (url: String, postProcessor: (String) => T)
+ (implicit columns: Array[String] = null,
+ attrToFilters: Map[String, Array[Filter]] = null) : T = {
+ logger.warn("Loading data from Cloudant using query: " + url)
+ val requestTimeout = config.requestTimeout.toInt
+ val clRequest: HttpRequest = config.username match {
+ case null =>
+ Http(url)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("User-Agent", "spark-cloudant")
+ case _ =>
+ Http(url)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("User-Agent", "spark-cloudant")
+ .auth(config.username, config.password)
+ }
+
+ val clResponse: HttpResponse[String] = clRequest.execute()
+ if (! clResponse.isSuccess) {
+ throw new RuntimeException("Database " + config.getDbname() +
+ " request error: " + clResponse.body)
+ }
+ val data = postProcessor(clResponse.body)
+ logger.debug(s"got result:$data")
+ data
+ }
+
+
+ def createDB(): Unit = {
+ val url = config.getDbUrl()
+ val requestTimeout = config.requestTimeout.toInt
+ val clRequest: HttpRequest = config.username match {
+ case null =>
+ Http(url)
+ .method("put")
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("User-Agent", "spark-cloudant")
+ case _ =>
+ Http(url)
+ .method("put")
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("User-Agent", "spark-cloudant")
+ .auth(config.username, config.password)
+ }
+
+ val clResponse: HttpResponse[String] = clRequest.execute()
+ if (! clResponse.isSuccess) {
+ throw new RuntimeException("Database " + config.getDbname() +
+ " create error: " + clResponse.body)
+ } else {
+ logger.warn(s"Database ${config.getDbname()} was created.")
+ }
+ }
+
+
+ def getClPostRequest(data: String): HttpRequest = {
+ val url = config.getBulkPostUrl()
+ val requestTimeout = config.requestTimeout.toInt
+ config.username match {
+ case null =>
+ Http(url)
+ .postData(data)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("Content-Type", "application/json")
+ .header("User-Agent", "spark-cloudant")
+ case _ =>
+ Http(url)
+ .postData(data)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("Content-Type", "application/json")
+ .header("User-Agent", "spark-cloudant")
+ .auth(config.username, config.password)
+ }
+ }
+
+
+ def saveAll(rows: List[String]): Unit = {
+ if (rows.size == 0) return
+ val bulkSize = config.bulkSize
+ val bulks = rows.grouped(bulkSize).toList
+ val totalBulks = bulks.size
+ logger.debug(s"total records:${rows.size}=bulkSize:$bulkSize * totalBulks:$totalBulks")
+
+ val futures = bulks.map( bulk => {
+ val data = config.getBulkRows(bulk)
+ val clRequest: HttpRequest = getClPostRequest(data)
+ Future {
+ clRequest.execute()
+ }
+ }
+ )
+ // remaining - number of requests remained to succeed
+ val remaining = new AtomicInteger(futures.length)
+ val p = Promise[HttpResponse[String]]
+ futures foreach {
+ _ onComplete {
+ case Success(clResponse: HttpResponse[String]) =>
+ // find if there was error in saving at least one of docs
+ val resBody: String = clResponse.body
+ val isErr = (resBody contains config.getConflictErrStr()) ||
+ (resBody contains config.getForbiddenErrStr())
+ if (!(clResponse.isSuccess) || isErr) {
+ val e = new RuntimeException("Save to database:" + config.getDbname() +
+ " failed with reason: " + clResponse.body)
+ p.tryFailure(e)
+ } else if (remaining.decrementAndGet() == 0) {
+ // succeed the whole save operation if all requests success
+ p.trySuccess(clResponse)
+ }
+ // if a least one save request fails - fail the whole save operation
+ case Failure(e) =>
+ p.tryFailure(e)
+ }
+ }
+
+ val mainFtr = p.future
+ mainFtr onSuccess {
+ case clResponsesList =>
+ logger.warn(s"Saved total ${rows.length} " +
+ s"with bulkSize $bulkSize " +
+ s"for database: ${config.getDbname()}")
+ }
+ mainFtr onFailure {
+ case e =>
+ throw new RuntimeException("Save to database:" + config.getDbname() +
+ " failed with reason: " + e.getMessage)
+ }
+ Await.result(mainFtr, (config.requestTimeout * totalBulks).millis) // scalastyle:ignore
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
new file mode 100644
index 0000000..46774f5
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.common
+
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.Partition
+import org.apache.spark.SparkContext
+import org.apache.spark.TaskContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources.Filter
+
+import org.apache.bahir.cloudant.CloudantConfig
+
+/**
+ * JsonStoreRDDPartition defines each partition as a subset of a query result:
+ * the limit rows returns and the skipped rows.
+ */
+
+private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int,
+ val idx: Int, val config: CloudantConfig,
+ val attrToFilters: Map[String, Array[Filter]])
+ extends Partition with Serializable{
+ val index = idx
+}
+
+/**
+ * The main purpose of JsonStoreRDD is to be able to create parallel read
+ * by partition for dataaccess getAll (by condition) scenarios
+ * defaultPartitions : how many partition intent,
+ * will be re-calculate based on the value based on total rows
+ * and minInPartition / maxInPartition )
+ * maxRowsInPartition: -1 means unlimited
+ */
+class JsonStoreRDD(sc: SparkContext, config: CloudantConfig,
+ url: String)(implicit requiredcolumns: Array[String] = null,
+ attrToFilters: Map[String, Array[Filter]] = null)
+ extends RDD[String](sc, Nil) {
+
+ lazy val totalRows = {
+ new JsonStoreDataAccess(config).getTotalRows(url)
+ }
+ lazy val totalPartition = {
+ if (totalRows == 0 || ! config.allowPartition() ) 1
+ else if (totalRows < config.partitions * config.minInPartition) {
+ val total = totalRows / config.minInPartition
+ if (total == 0 ) {
+ total + 1
+ } else {
+ total
+ }
+ }
+ else if (config.maxInPartition <=0) config.partitions
+ else {
+ val total = totalRows / config.maxInPartition
+ if ( totalRows % config.maxInPartition != 0) {
+ total + 1
+ }
+ else {
+ total
+ }
+ }
+ }
+
+ lazy val limitPerPartition = {
+ val limit = totalRows/totalPartition
+ if (totalRows % totalPartition != 0) {
+ limit + 1
+ } else {
+ limit
+ }
+ }
+
+ override def getPartitions: Array[Partition] = {
+ val logger = LoggerFactory.getLogger(getClass)
+ logger.info(s"Partition config - total=$totalPartition, " +
+ s"limit=$limitPerPartition for totalRows of $totalRows")
+
+ (0 until totalPartition).map(i => {
+ val skip = i * limitPerPartition
+ new JsonStoreRDDPartition(skip, limitPerPartition, i, config,
+ attrToFilters).asInstanceOf[Partition]
+ }).toArray
+ }
+
+ override def compute(splitIn: Partition, context: TaskContext):
+ Iterator[String] = {
+ val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
+ new JsonStoreDataAccess(myPartition.config).getIterator(myPartition.skip,
+ myPartition.limit, url)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/f0d9a84f/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
new file mode 100644
index 0000000..cd46b16
--- /dev/null
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonUtil.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bahir.cloudant.common
+
+import play.api.libs.json.{JsUndefined, JsValue}
+import scala.util.control.Breaks._
+
+object JsonUtil {
+ def getField(row: JsValue, field: String) : Option[JsValue] = {
+ var path = field.split('.')
+ var currentValue = row
+ var finalValue: Option[JsValue] = None
+ breakable {
+ for (i <- path.indices) {
+ val f: Option[JsValue] = (currentValue \ path(i)).toOption
+ f match {
+ case Some(f2) => currentValue = f2
+ case None => break
+ }
+ if (i == path.length -1) {
+ // The leaf node
+ finalValue = Some(currentValue)
+ }
+ }
+ }
+ finalValue
+ }
+}