You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by lr...@apache.org on 2017/07/20 01:37:21 UTC
[35/50] [abbrv] incubator-livy-website git commit: [BAHIR-102]
Initial support of Cloudant Query and examples
[BAHIR-102] Initial support of Cloudant Query and examples
Add optimization to use query in particular scenarios.
Closes #41.
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/commit/fd4c35fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/tree/fd4c35fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy-website/diff/fd4c35fc
Branch: refs/heads/master
Commit: fd4c35fc9f7ebb57464d231cf5d66e7bc4096a1b
Parents: abfdc70
Author: Yang Lei <ge...@gmail.com>
Authored: Fri Apr 7 19:23:43 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu Apr 13 12:15:10 2017 -0700
----------------------------------------------------------------------
sql-cloudant/README.md | 2 +
sql-cloudant/examples/python/CloudantQuery.py | 65 ++++++++++
sql-cloudant/examples/python/CloudantQueryDF.py | 61 +++++++++
.../src/main/resources/application.conf | 2 +
.../apache/bahir/cloudant/CloudantConfig.scala | 94 ++++++--------
.../apache/bahir/cloudant/DefaultSource.scala | 32 +----
.../common/JsonStoreConfigManager.scala | 62 ++-------
.../cloudant/common/JsonStoreDataAccess.scala | 79 ++++++------
.../bahir/cloudant/common/JsonStoreRDD.scala | 129 ++++++++++++++++---
9 files changed, 338 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index eaa8893..38d2bbb 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -62,6 +62,8 @@ cloudant.protocol|https|protocol to use to transfer data: http or https
cloudant.host||cloudant host url
cloudant.username||cloudant userid
cloudant.password||cloudant password
+cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.
+cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint.
jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
jsonstore.rdd.minInPartition|10|the min rows in a partition.
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/examples/python/CloudantQuery.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantQuery.py b/sql-cloudant/examples/python/CloudantQuery.py
new file mode 100644
index 0000000..5ca5c44
--- /dev/null
+++ b/sql-cloudant/examples/python/CloudantQuery.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+from pyspark.sql import SparkSession
+
+# define cloudant related configuration
+# set protocol to http if needed, default value=https
+# config("cloudant.protocol","http")
+spark = SparkSession\
+ .builder\
+ .appName("Cloudant Spark SQL Example in Python using query")\
+ .config("cloudant.host","ACCOUNT.cloudant.com")\
+ .config("cloudant.username", "USERNAME")\
+ .config("cloudant.password","PASSWORD")\
+ .config("jsonstore.rdd.partitions", 8)\
+ .config("cloudant.useQuery", "true")\
+ .config("schemaSampleSize",1)\
+ .getOrCreate()
+
+
+spark.sql(" CREATE TEMPORARY VIEW airportTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable1 WHERE airportName == 'Moscow' ")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+airportData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW airportTable2 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable2 WHERE airportName > 'Moscow' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+airportData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW airportTable3 USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable3 WHERE airportName > 'Moscow' AND airportName < 'Sydney' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+airportData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW flight1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight')")
+flightData = spark.sql("SELECT flightSegmentId, economyClassBaseCost, numFirstClassSeats FROM flight1 WHERE economyClassBaseCost >=200 AND numFirstClassSeats<=10")
+flightData.printSchema()
+print 'Total # of rows in airportData: ' + str(flightData.count())
+flightData.show()
+
+spark.sql(" CREATE TEMPORARY VIEW flight2 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight')")
+flightData = spark.sql("SELECT flightSegmentId, scheduledDepartureTime, scheduledArrivalTime FROM flight2 WHERE scheduledDepartureTime >='2014-12-15T05:00:00.000Z' AND scheduledArrivalTime <='2014-12-15T11:04:00.000Z'")
+flightData.printSchema()
+print 'Total # of rows in airportData: ' + str(flightData.count())
+flightData.show()
+
+
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/examples/python/CloudantQueryDF.py
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/python/CloudantQueryDF.py b/sql-cloudant/examples/python/CloudantQueryDF.py
new file mode 100644
index 0000000..c8fa296
--- /dev/null
+++ b/sql-cloudant/examples/python/CloudantQueryDF.py
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pprint
+from pyspark.sql import SparkSession
+
+# define cloudant related configuration
+# set protocol to http if needed, default value=https
+# config("cloudant.protocol","http")
+spark = SparkSession\
+ .builder\
+ .appName("Cloudant Spark SQL Example in Python using query")\
+ .config("cloudant.host","ACCOUNT.cloudant.com")\
+ .config("cloudant.username", "USERNAME")\
+ .config("cloudant.password","PASSWORD")\
+ .config("jsonstore.rdd.partitions", 8)\
+ .config("cloudant.useQuery", "true")\
+ .config("schemaSampleSize",1)\
+ .getOrCreate()
+
+
+# ***0. Loading dataframe from Cloudant db with one String field condition
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.airportName == 'Moscow').select("_id",'airportName').show()
+
+
+# ***1. Loading dataframe from Cloudant db with one String field condition
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.airportName > 'Moscow').select("_id",'airportName').show()
+
+# ***2. Loading dataframe from Cloudant db with two String field condition
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.airportName > 'Moscow').filter(df.airportName < 'Sydney').select("_id",'airportName').show()
+
+# ***3. Loading dataframe from Cloudant db with two int field condition
+df = spark.read.load("n_flight", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.economyClassBaseCost >= 200).filter(df.numFirstClassSeats <=10).select('flightSegmentId','scheduledDepartureTime', 'scheduledArrivalTime').show()
+
+# ***4. Loading dataframe from Cloudant db with two timestamp field condition
+df = spark.read.load("n_flight", "org.apache.bahir.cloudant")
+df.printSchema()
+df.filter(df.scheduledDepartureTime >= "2014-12-15T05:00:00.000Z").filter(df.scheduledArrivalTime <="2014-12-15T11:04:00.000Z").select('flightSegmentId','scheduledDepartureTime', 'scheduledArrivalTime').show()
+
+
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf
index 2d8b236..80dea91 100644
--- a/sql-cloudant/src/main/resources/application.conf
+++ b/sql-cloudant/src/main/resources/application.conf
@@ -10,5 +10,7 @@ spark-sql {
}
cloudant = {
protocol = https
+ useQuery = false
+ queryLimit = 25
}
}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
index ac14f4b..c4e27b9 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantConfig.scala
@@ -34,20 +34,16 @@ class CloudantConfig(val protocol: String, val host: String,
(implicit val username: String, val password: String,
val partitions: Int, val maxInPartition: Int, val minInPartition: Int,
val requestTimeout: Long, val bulkSize: Int, val schemaSampleSize: Int,
- val createDBOnSave: Boolean, val selector: String)
+ val createDBOnSave: Boolean, val selector: String, val useQuery: Boolean = false,
+ val queryLimit: Int)
extends Serializable{
- private val SCHEMA_FOR_ALL_DOCS_NUM = -1
private lazy val dbUrl = {protocol + "://" + host + "/" + dbName}
val pkField = "_id"
val defaultIndex = "_all_docs" // "_changes" does not work for partition
val default_filter: String = "*:*"
- def getChangesUrl(): String = {
- dbUrl + "/_changes?include_docs=true&feed=normal"
- }
-
def getContinuousChangesUrl(): String = {
var url = dbUrl + "/_changes?include_docs=true&feed=continuous&heartbeat=3000"
if (selector != null) {
@@ -64,11 +60,6 @@ class CloudantConfig(val protocol: String, val host: String,
dbUrl
}
- def getLastUrl(skip: Int): String = {
- if (skip ==0 ) null
- else s"$dbUrl/$defaultIndex?limit=$skip"
- }
-
def getSchemaSampleSize(): Int = {
schemaSampleSize
}
@@ -77,8 +68,6 @@ class CloudantConfig(val protocol: String, val host: String,
createDBOnSave
}
- def getLastNum(result: JsValue): JsValue = (result \ "last_seq").get
-
def getTotalUrl(url: String): String = {
if (url.contains('?')) {
url + "&limit=1"
@@ -91,37 +80,24 @@ class CloudantConfig(val protocol: String, val host: String,
dbName
}
- def allowPartition(): Boolean = {indexName==null}
+ def queryEnabled(): Boolean = {useQuery && indexName==null && viewName==null}
- def getOneUrl(): String = {
- dbUrl + "/_all_docs?limit=1&include_docs=true"
- }
+ def allowPartition(queryUsed: Boolean): Boolean = {indexName==null && !queryUsed}
- def getOneUrlExcludeDDoc1(): String = {
- dbUrl + "/_all_docs?endkey=%22_design/%22&limit=1&include_docs=true"
- }
+ def getAllDocsUrl(limit: Int, excludeDDoc: Boolean = false): String = {
- def getOneUrlExcludeDDoc2(): String = {
- dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=1&include_docs=true"
- }
-
- def getAllDocsUrlExcludeDDoc(limit: Int): String = {
if (viewName == null) {
- dbUrl + "/_all_docs?startkey=%22_design0/%22&limit=" + limit + "&include_docs=true"
- } else {
- dbUrl + "/" + viewName + "?limit=1"
- }
- }
-
- def getAllDocsUrl(limit: Int): String = {
- if (viewName == null) {
- if (limit == SCHEMA_FOR_ALL_DOCS_NUM) {
- dbUrl + "/_all_docs?include_docs=true"
+ val baseUrl = (
+ if ( excludeDDoc) dbUrl + "/_all_docs?startkey=%22_design0/%22&include_docs=true"
+ else dbUrl + "/_all_docs?include_docs=true"
+ )
+ if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) {
+ baseUrl
} else {
- dbUrl + "/_all_docs?limit=" + limit + "&include_docs=true"
+ baseUrl + "&limit=" + limit
}
} else {
- if (limit == JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM) {
+ if (limit == JsonStoreConfigManager.ALL_DOCS_LIMIT) {
dbUrl + "/" + viewName
} else {
dbUrl + "/" + viewName + "?limit=" + limit
@@ -132,22 +108,23 @@ class CloudantConfig(val protocol: String, val host: String,
def getRangeUrl(field: String = null, start: Any = null,
startInclusive: Boolean = false, end: Any = null,
endInclusive: Boolean = false,
- includeDoc: Boolean = true): (String, Boolean) = {
- val (url: String, pusheddown: Boolean) =
- calculate(field, start, startInclusive, end, endInclusive)
- if (includeDoc) {
+ includeDoc: Boolean = true,
+ allowQuery: Boolean = false): (String, Boolean, Boolean) = {
+ val (url: String, pusheddown: Boolean, queryUsed: Boolean) =
+ calculate(field, start, startInclusive, end, endInclusive, allowQuery)
+ if (includeDoc && !queryUsed ) {
if (url.indexOf('?') > 0) {
- (url + "&include_docs=true", pusheddown)
+ (url + "&include_docs=true", pusheddown, queryUsed)
} else {
- (url + "?include_docs=true", pusheddown)
+ (url + "?include_docs=true", pusheddown, queryUsed)
}
} else {
- (url, pusheddown)
+ (url, pusheddown, queryUsed)
}
}
private def calculate(field: String, start: Any, startInclusive: Boolean,
- end: Any, endInclusive: Boolean): (String, Boolean) = {
+ end: Any, endInclusive: Boolean, allowQuery: Boolean): (String, Boolean, Boolean) = {
if (field != null && field.equals(pkField)) {
var condition = ""
if (start != null && end != null && start.equals(end)) {
@@ -166,16 +143,18 @@ class CloudantConfig(val protocol: String, val host: String,
condition += "endkey=%22" + URLEncoder.encode(end.toString(), "UTF-8") + "%22"
}
}
- (dbUrl + "/_all_docs" + condition, true)
+ (dbUrl + "/_all_docs" + condition, true, false)
} else if (indexName!=null) {
// push down to indexName
val condition = calculateCondition(field, start, startInclusive,
end, endInclusive)
- (dbUrl + "/" + indexName + "?q=" + condition, true)
+ (dbUrl + "/" + indexName + "?q=" + condition, true, false)
} else if (viewName != null) {
- (dbUrl + "/" + viewName, true)
+ (dbUrl + "/" + viewName, false, false)
+ } else if (allowQuery && useQuery) {
+ (s"$dbUrl/_find", false, true)
} else {
- (s"$dbUrl/$defaultIndex", false)
+ (s"$dbUrl/$defaultIndex", false, false)
}
}
@@ -215,20 +194,21 @@ class CloudantConfig(val protocol: String, val host: String,
}
}
- def getSubSetUrl (url: String, skip: Int, limit: Int)
- (implicit convertSkip: (Int) => String): String = {
+ def getSubSetUrl (url: String, skip: Int, limit: Int, queryUsed: Boolean): String = {
val suffix = {
if (url.indexOf("_all_docs")>0) "include_docs=true&limit=" +
limit + "&skip=" + skip
- else if (url.indexOf("_changes")>0) "include_docs=true&limit=" +
- limit + "&since=" + convertSkip(skip)
else if (viewName != null) {
"limit=" + limit + "&skip=" + skip
+ } else if (queryUsed) {
+ ""
} else {
"include_docs=true&limit=" + limit
} // TODO Index query does not support subset query. Should disable Partitioned loading?
}
- if (url.indexOf('?') > 0) {
+ if (suffix.length==0) {
+ url
+ } else if (url.indexOf('?') > 0) {
url + "&" + suffix
}
else {
@@ -246,8 +226,10 @@ class CloudantConfig(val protocol: String, val host: String,
}
}
- def getRows(result: JsValue): Seq[JsValue] = {
- if (viewName == null) {
+ def getRows(result: JsValue, queryUsed: Boolean): Seq[JsValue] = {
+ if ( queryUsed ) {
+ ((result \ "docs").as[JsArray]).value.map(row => row)
+ } else if ( viewName == null) {
((result \ "rows").as[JsArray]).value.map(row => (row \ "doc").get)
} else {
((result \ "rows").as[JsArray]).value.map(row => row)
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 4c973f7..deab22a 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-import org.apache.bahir.cloudant.common.{FilterInterpreter, JsonStoreDataAccess, JsonStoreRDD, _}
+import org.apache.bahir.cloudant.common.{JsonStoreDataAccess, JsonStoreRDD, _}
case class CloudantReadWriteRelation (config: CloudantConfig,
schema: StructType,
@@ -49,23 +49,11 @@ case class CloudantReadWriteRelation (config: CloudantConfig,
allDocsDF.select(requiredColumns(0), colsExceptCol0: _*).rdd
}
} else {
- val filterInterpreter = new FilterInterpreter(filters)
- var searchField: String = {
- if (filterInterpreter.containsFiltersFor(config.pkField)) {
- config.pkField
- } else {
- filterInterpreter.firstField
- }
- }
-
- val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField)
- implicit val columns = requiredColumns
- val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField,
- min, minInclusive, max, maxInclusive, false)
- if (!pusheddown) searchField = null
- implicit val attrToFilters = filterInterpreter.getFiltersForPostProcess(searchField)
+ implicit val columns : Array[String] = requiredColumns
+ implicit val origFilters : Array[Filter] = filters
- val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url)
+ logger.info("buildScan:" + columns + "," + origFilters)
+ val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
val df = sqlContext.read.json(cloudantRDD)
if (colsLength > 1) {
val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
@@ -117,16 +105,10 @@ class DefaultSource extends RelationProvider
inSchema
} else {
val df = if (config.getSchemaSampleSize() ==
- JsonStoreConfigManager.SCHEMA_FOR_ALL_DOCS_NUM &&
+ JsonStoreConfigManager.ALL_DOCS_LIMIT &&
config.viewName == null
&& config.indexName == null) {
- val filterInterpreter = new FilterInterpreter(null)
- var searchField = null
- val (min, minInclusive, max, maxInclusive) =
- filterInterpreter.getInfo(searchField)
- val (url: String, pusheddown: Boolean) = config.getRangeUrl(searchField,
- min, minInclusive, max, maxInclusive, false)
- val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config, url)
+ val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
allDocsDF = sqlContext.read.json(cloudantRDD)
allDocsDF
} else {
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
index 92192bb..38c5006 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
@@ -25,13 +25,14 @@ import org.apache.bahir.cloudant.CloudantConfig
object JsonStoreConfigManager {
val CLOUDANT_CONNECTOR_VERSION = "2.0.0"
- val SCHEMA_FOR_ALL_DOCS_NUM = -1
+ val ALL_DOCS_LIMIT = -1
private val CLOUDANT_HOST_CONFIG = "cloudant.host"
private val CLOUDANT_USERNAME_CONFIG = "cloudant.username"
private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password"
private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol"
-
+ private val USE_QUERY_CONFIG = "cloudant.useQuery"
+ private val QUERY_LIMIT_CONFIG = "cloudant.queryLimit"
private val PARTITION_CONFIG = "jsonstore.rdd.partitions"
private val MAX_IN_PARTITION_CONFIG = "jsonstore.rdd.maxInPartition"
@@ -39,7 +40,7 @@ import org.apache.bahir.cloudant.CloudantConfig
private val REQUEST_TIMEOUT_CONFIG = "jsonstore.rdd.requestTimeout"
private val BULK_SIZE_CONFIG = "bulkSize"
private val SCHEMA_SAMPLE_SIZE_CONFIG = "schemaSampleSize"
- private val CREATE_DB_ON_SAVE = "createDBOnSave"
+ private val CREATE_DB_ON_SAVE_CONFIG = "createDBOnSave"
private val configFactory = ConfigFactory.load()
@@ -139,6 +140,10 @@ import org.apache.bahir.cloudant.CloudantConfig
def getConfig(context: SQLContext, parameters: Map[String, String]): CloudantConfig = {
val sparkConf = context.sparkContext.getConf
+ getConfig(sparkConf, parameters)
+ }
+
+ def getConfig (sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = {
implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
@@ -146,67 +151,28 @@ import org.apache.bahir.cloudant.CloudantConfig
implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
- implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE)
+ implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE_CONFIG)
+
+ implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
+ implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
val dbName = parameters.getOrElse("database", parameters.getOrElse("path", null))
val indexName = parameters.getOrElse("index", null)
val viewName = parameters.getOrElse("view", null)
-
- // FIXME: Add logger
- // scalastyle:off println
- println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " +
- s"indexName=$indexName, viewName=$viewName," +
- s"$PARTITION_CONFIG=$total, $MAX_IN_PARTITION_CONFIG=$max," +
- s"$MIN_IN_PARTITION_CONFIG=$min, $REQUEST_TIMEOUT_CONFIG=$requestTimeout," +
- s"$BULK_SIZE_CONFIG=$bulkSize, $SCHEMA_SAMPLE_SIZE_CONFIG=$schemaSampleSize")
- // scalastyle:on println
+ val selector = parameters.getOrElse("selector", null)
val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG)
val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG)
val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG)
val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG)
- val selector = getString(sparkConf, parameters, "selector")
if (host != null) {
new CloudantConfig(protocol, host, dbName, indexName,
viewName) (user, passwd, total, max, min, requestTimeout, bulkSize,
- schemaSampleSize, createDBOnSave, selector)
+ schemaSampleSize, createDBOnSave, selector, useQuery, queryLimit)
} else {
throw new RuntimeException("Spark configuration is invalid! " +
"Please make sure to supply required values for cloudant.host.")
}
}
-
- def getConfig(sparkConf: SparkConf, parameters: Map[String, String]): CloudantConfig = {
-
- implicit val total = getInt(sparkConf, parameters, PARTITION_CONFIG)
- implicit val max = getInt(sparkConf, parameters, MAX_IN_PARTITION_CONFIG)
- implicit val min = getInt(sparkConf, parameters, MIN_IN_PARTITION_CONFIG)
- implicit val requestTimeout = getLong(sparkConf, parameters, REQUEST_TIMEOUT_CONFIG)
- implicit val bulkSize = getInt(sparkConf, parameters, BULK_SIZE_CONFIG)
- implicit val schemaSampleSize = getInt(sparkConf, parameters, SCHEMA_SAMPLE_SIZE_CONFIG)
- implicit val createDBOnSave = getBool(sparkConf, parameters, CREATE_DB_ON_SAVE)
-
- val dbName = parameters.getOrElse("database", null)
-
- // scalastyle:off println
- println(s"Use connectorVersion=$CLOUDANT_CONNECTOR_VERSION, dbName=$dbName, " +
- s"$REQUEST_TIMEOUT_CONFIG=$requestTimeout")
- // scalastyle:on println
-
- val protocol = getString(sparkConf, parameters, CLOUDANT_PROTOCOL_CONFIG)
- val host = getString( sparkConf, parameters, CLOUDANT_HOST_CONFIG)
- val user = getString(sparkConf, parameters, CLOUDANT_USERNAME_CONFIG)
- val passwd = getString(sparkConf, parameters, CLOUDANT_PASSWORD_CONFIG)
- val selector = getString(sparkConf, parameters, "selector")
-
- if (host != null) {
- new CloudantConfig(protocol, host, dbName)(user, passwd,
- total, max, min, requestTimeout, bulkSize,
- schemaSampleSize, createDBOnSave, selector)
- } else {
- throw new RuntimeException("Cloudant parameters are invalid!" +
- "Please make sure to supply required values for cloudant.host.")
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
index e84a44c..ac79359 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreDataAccess.scala
@@ -39,19 +39,6 @@ class JsonStoreDataAccess (config: CloudantConfig) {
lazy val logger = LoggerFactory.getLogger(getClass)
implicit lazy val timeout = config.requestTimeout
- def getOne()( implicit columns: Array[String] = null): Seq[String] = {
- var r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc1(), processAll)
- if (r.size == 0 ) {
- r = this.getQueryResult[Seq[String]](config.getOneUrlExcludeDDoc2(), processAll)
- }
- if (r.size == 0) {
- throw new RuntimeException("Database " + config.getDbname() +
- " doesn't have any non-design documents!")
- } else {
- r
- }
- }
-
def getMany(limit: Int)(implicit columns: Array[String] = null): Seq[String] = {
if (limit == 0) {
throw new RuntimeException("Database " + config.getDbname() +
@@ -63,7 +50,7 @@ class JsonStoreDataAccess (config: CloudantConfig) {
}
var r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit), processAll)
if (r.size == 0) {
- r = this.getQueryResult[Seq[String]](config.getAllDocsUrlExcludeDDoc(limit), processAll)
+ r = this.getQueryResult[Seq[String]](config.getAllDocsUrl(limit, true), processAll)
}
if (r.size == 0) {
throw new RuntimeException("Database " + config.getDbname() +
@@ -74,40 +61,34 @@ class JsonStoreDataAccess (config: CloudantConfig) {
}
def getAll[T](url: String)
- (implicit columns: Array[String] = null,
- attrToFilters: Map[String, Array[Filter]] = null): Seq[String] = {
+ (implicit columns: Array[String] = null): Seq[String] = {
this.getQueryResult[Seq[String]](url, processAll)
}
def getIterator(skip: Int, limit: Int, url: String)
(implicit columns: Array[String] = null,
- attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = {
- implicit def convertSkip(skip: Int): String = {
- val url = config.getLastUrl(skip)
- if (url == null) {
- skip.toString()
- } else {
- this.getQueryResult[String](url,
- { result => config.getLastNum(Json.parse(result)).as[JsString].value})
- }
- }
- val newUrl = config.getSubSetUrl(url, skip, limit)
+ postData: String = null): Iterator[String] = {
+ val newUrl = config.getSubSetUrl(url, skip, limit, postData!=null)
this.getQueryResult[Iterator[String]](newUrl, processIterator)
}
- def getTotalRows(url: String): Int = {
- val totalUrl = config.getTotalUrl(url)
- this.getQueryResult[Int](totalUrl,
- { result => config.getTotalRows(Json.parse(result))})
+ def getTotalRows(url: String, queryUsed: Boolean)
+ (implicit postData: String = null): Int = {
+ if (queryUsed) config.queryLimit // Query can not retrieve total row now.
+ else {
+ val totalUrl = config.getTotalUrl(url)
+ this.getQueryResult[Int](totalUrl,
+ { result => config.getTotalRows(Json.parse(result))})
+ }
}
private def processAll (result: String)
(implicit columns: Array[String],
- attrToFilters: Map[String, Array[Filter]] = null) = {
- logger.debug(s"processAll columns:$columns, attrToFilters:$attrToFilters")
+ postData: String = null) = {
+ logger.debug(s"processAll:$result, columns:$columns")
val jsonResult: JsValue = Json.parse(result)
- var rows = config.getRows(jsonResult)
- if (config.viewName == null) {
+ var rows = config.getRows(jsonResult, postData!=null )
+ if (config.viewName == null && postData==null) {
// filter design docs
rows = rows.filter(r => FilterDDocs.filter(r))
}
@@ -116,7 +97,7 @@ class JsonStoreDataAccess (config: CloudantConfig) {
private def processIterator (result: String)
(implicit columns: Array[String],
- attrToFilters: Map[String, Array[Filter]] = null): Iterator[String] = {
+ postData: String = null): Iterator[String] = {
processAll(result).iterator
}
@@ -137,23 +118,39 @@ class JsonStoreDataAccess (config: CloudantConfig) {
getQueryResult(url, processResults)
}
-
private def getQueryResult[T]
(url: String, postProcessor: (String) => T)
(implicit columns: Array[String] = null,
- attrToFilters: Map[String, Array[Filter]] = null) : T = {
- logger.warn("Loading data from Cloudant using query: " + url)
+ postData: String = null) : T = {
+ logger.info(s"Loading data from Cloudant using: $url , postData: $postData")
val requestTimeout = config.requestTimeout.toInt
val clRequest: HttpRequest = config.username match {
case null =>
- Http(url)
+ if (postData!=null) {
+ Http(url)
+ .postData(postData)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("Content-Type", "application/json")
+ .header("User-Agent", "spark-cloudant")
+ } else {
+ Http(url)
.timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
.header("User-Agent", "spark-cloudant")
+ }
case _ =>
- Http(url)
+ if (postData!=null) {
+ Http(url)
+ .postData(postData)
+ .timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
+ .header("Content-Type", "application/json")
+ .header("User-Agent", "spark-cloudant")
+ .auth(config.username, config.password)
+ } else {
+ Http(url)
.timeout(connTimeoutMs = 1000, readTimeoutMs = requestTimeout)
.header("User-Agent", "spark-cloudant")
.auth(config.username, config.password)
+ }
}
val clResponse: HttpResponse[String] = clRequest.execute()
http://git-wip-us.apache.org/repos/asf/incubator-livy-website/blob/fd4c35fc/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
index 46774f5..46ba912 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreRDD.scala
@@ -17,12 +17,13 @@
package org.apache.bahir.cloudant.common
import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsNull, Json, JsString, JsValue}
import org.apache.spark.Partition
import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.sources._
import org.apache.bahir.cloudant.CloudantConfig
@@ -31,9 +32,9 @@ import org.apache.bahir.cloudant.CloudantConfig
* the limit rows returns and the skipped rows.
*/
-private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int,
- val idx: Int, val config: CloudantConfig,
- val attrToFilters: Map[String, Array[Filter]])
+private[cloudant] class JsonStoreRDDPartition(val url: String, val skip: Int, val limit: Int,
+ val idx: Int, val config: CloudantConfig, val selector: JsValue, val fields: JsValue,
+ val queryUsed: Boolean)
extends Partition with Serializable{
val index = idx
}
@@ -46,16 +47,15 @@ private[cloudant] class JsonStoreRDDPartition(val skip: Int, val limit: Int,
* and minInPartition / maxInPartition )
* maxRowsInPartition: -1 means unlimited
*/
-class JsonStoreRDD(sc: SparkContext, config: CloudantConfig,
- url: String)(implicit requiredcolumns: Array[String] = null,
- attrToFilters: Map[String, Array[Filter]] = null)
+class JsonStoreRDD(sc: SparkContext, config: CloudantConfig)
+ (implicit requiredcolumns: Array[String] = null,
+ filters: Array[Filter] = null)
extends RDD[String](sc, Nil) {
- lazy val totalRows = {
- new JsonStoreDataAccess(config).getTotalRows(url)
- }
- lazy val totalPartition = {
- if (totalRows == 0 || ! config.allowPartition() ) 1
+ private val logger = LoggerFactory.getLogger(getClass)
+
+ private def getTotalPartition(totalRows: Int, queryUsed: Boolean): Int = {
+ if (totalRows == 0 || ! config.allowPartition(queryUsed) ) 1
else if (totalRows < config.partitions * config.minInPartition) {
val total = totalRows / config.minInPartition
if (total == 0 ) {
@@ -76,7 +76,7 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig,
}
}
- lazy val limitPerPartition = {
+ private def getLimitPerPartition(totalRows: Int, totalPartition: Int): Int = {
val limit = totalRows/totalPartition
if (totalRows % totalPartition != 0) {
limit + 1
@@ -85,22 +85,115 @@ class JsonStoreRDD(sc: SparkContext, config: CloudantConfig,
}
}
+ private def convertToMangoJson(f: Filter): (String, JsValue) = {
+ val (op, value): (String, Any) = f match {
+ case EqualTo(attr, v) => ("$eq", v)
+ case GreaterThan(attr, v) => ("$gt", v)
+ case LessThan(attr, v) => ("$lt", v)
+ case GreaterThanOrEqual(attr, v) => ("$gte", v)
+ case LessThanOrEqual(attr, v) => ("$lte", v)
+ case _ => (null, null)
+ }
+ val convertedV: JsValue = {
+ // TODO Better handing of other types
+ if (value != null) {
+ value match {
+ case s: String => Json.toJson(s)
+ case l: Long => Json.toJson(l)
+ case d: Double => Json.toJson(d)
+ case i: Int => Json.toJson(i)
+ case b: Boolean => Json.toJson(b)
+ case t: java.sql.Timestamp => Json.toJson(t)
+ case a: Any => logger.debug(s"Ignore field:$name, cannot handle its datatype: $a"); null
+ }
+ } else null
+ }
+ (op, convertedV)
+ }
+
+ private def convertAttrToMangoJson(filters: Array[Filter]): Map[String, JsValue] = {
+ filters.map(af => convertToMangoJson(af))
+ .filter(x => x._2 != null)
+ .toMap
+ }
+
override def getPartitions: Array[Partition] = {
- val logger = LoggerFactory.getLogger(getClass)
+
+ logger.info("getPartitions:" + requiredcolumns + "," + filters)
+
+ val filterInterpreter = new FilterInterpreter(filters)
+ val origAttrToFilters = ( if (filters==null || filters.length==0) null
+ else filterInterpreter.getFiltersForPostProcess(null))
+
+ val (selector, fields) : (JsValue, JsValue) = {
+ if (!config.queryEnabled() || origAttrToFilters == null) (null, null)
+ else {
+ val selectors: Map[String, Map[String, JsValue]] =
+ origAttrToFilters.transform( (name, attrFilters) => convertAttrToMangoJson(attrFilters))
+ val filteredSelectors = selectors.filter((t) => ! t._2.isEmpty)
+
+ if (! filteredSelectors.isEmpty) {
+ val queryColumns = (
+ if (requiredcolumns == null || requiredcolumns.size == 0) null
+ else Json.toJson(requiredcolumns))
+ (Json.toJson(filteredSelectors), queryColumns)
+ } else (null, null)
+ }
+ }
+
+ logger.info("calculated selector and fields:" + selector + "," + fields)
+
+ var searchField: String = {
+ if (origAttrToFilters ==null ) null
+ else if (filterInterpreter.containsFiltersFor(config.pkField)) {
+ config.pkField
+ } else {
+ filterInterpreter.firstField
+ }
+ }
+
+ val (min, minInclusive, max, maxInclusive) = filterInterpreter.getInfo(searchField)
+ val (url: String, pusheddown: Boolean, queryUsed: Boolean) = config.getRangeUrl(searchField,
+ min, minInclusive, max, maxInclusive, false, selector!=null)
+
+ implicit val postData : String = {
+ if (queryUsed) {
+ Json.stringify(Json.obj("selector" -> selector, "limit" -> 1))
+ } else {
+ null
+ }
+ }
+ val totalRows = new JsonStoreDataAccess(config).getTotalRows(url, queryUsed)
+ val totalPartition = getTotalPartition(totalRows, queryUsed)
+ val limitPerPartition = getLimitPerPartition(totalRows, totalPartition)
+
logger.info(s"Partition config - total=$totalPartition, " +
s"limit=$limitPerPartition for totalRows of $totalRows")
- (0 until totalPartition).map(i => {
+ logger.info(s"Partition query info - url=$url, queryUsed=$queryUsed")
+
+ (0 until totalPartition).map(i => {
val skip = i * limitPerPartition
- new JsonStoreRDDPartition(skip, limitPerPartition, i, config,
- attrToFilters).asInstanceOf[Partition]
+ new JsonStoreRDDPartition(url, skip, limitPerPartition, i,
+ config, selector, fields, queryUsed).asInstanceOf[Partition]
}).toArray
}
override def compute(splitIn: Partition, context: TaskContext):
Iterator[String] = {
val myPartition = splitIn.asInstanceOf[JsonStoreRDDPartition]
+ implicit val postData : String = {
+ if (myPartition.queryUsed && myPartition.fields !=null) {
+ Json.stringify(Json.obj("selector" -> myPartition.selector, "fields" -> myPartition.fields,
+ "limit" -> myPartition.limit, "skip" -> myPartition.skip))
+ } else if (myPartition.queryUsed) {
+ Json.stringify(Json.obj("selector" -> myPartition.selector, "limit" -> myPartition.limit,
+ "skip" -> myPartition.skip))
+ } else {
+ null
+ }
+ }
new JsonStoreDataAccess(myPartition.config).getIterator(myPartition.skip,
- myPartition.limit, url)
+ myPartition.limit, myPartition.url)
}
}