You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/01/25 01:24:06 UTC
bahir git commit: [BAHIR-137] CouchDB/Cloudant _changes feed receiver
improvements
Repository: bahir
Updated Branches:
refs/heads/master 770b2916f -> ebdc8b257
[BAHIR-137] CouchDB/Cloudant _changes feed receiver improvements
Adds batchInterval option for tuning _changes receiver’s streaming batch interval
Throw a CloudantException if the final schema for the _changes receiver is empty
Call stop method in streaming receiver when there’s an error
Closes #60
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/ebdc8b25
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/ebdc8b25
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/ebdc8b25
Branch: refs/heads/master
Commit: ebdc8b257d32ff64a88657cc3e7dc838564a1d01
Parents: 770b291
Author: Esteban Laver <em...@us.ibm.com>
Authored: Mon Oct 2 11:09:28 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jan 24 20:03:20 2018 -0500
----------------------------------------------------------------------
sql-cloudant/README.md | 1 +
sql-cloudant/src/main/resources/application.conf | 1 +
.../bahir/cloudant/CloudantChangesConfig.scala | 11 ++++++++++-
.../org/apache/bahir/cloudant/DefaultSource.scala | 11 ++++++++---
.../cloudant/common/JsonStoreConfigManager.scala | 6 ++++--
.../bahir/cloudant/internal/ChangesReceiver.scala | 4 +++-
.../apache/bahir/cloudant/CloudantOptionSuite.scala | 16 ++++++++++++++++
7 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index 18118eb..160e279 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -57,6 +57,7 @@ Default values are defined in [here](src/main/resources/application.conf).
Name | Default | Meaning
--- |:---:| ---
+cloudant.batchInterval|8|number of seconds to set for streaming all documents from `_changes` endpoint into Spark dataframe. See [Setting the right batch interval](https://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval) for tuning this value.
cloudant.endpoint|`_all_docs`|endpoint for RelationProvider when loading data from Cloudant to DataFrames or SQL temporary tables. Select between the Cloudant `_all_docs` or `_changes` API endpoint. See **Note** below for differences between endpoints.
cloudant.protocol|https|protocol to use to transfer data: http or https
cloudant.host| |cloudant host url
http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf
index 62497e2..6ff4139 100644
--- a/sql-cloudant/src/main/resources/application.conf
+++ b/sql-cloudant/src/main/resources/application.conf
@@ -9,6 +9,7 @@ spark-sql {
requestTimeout = 900000
}
cloudant = {
+ batchInterval = 8
endpoint = "_all_docs"
protocol = https
useQuery = false
http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
index 0e70b95..9f2a7ba 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
@@ -27,13 +27,17 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
bulkSize: Int, schemaSampleSize: Int,
createDBOnSave: Boolean, endpoint: String, selector: String,
timeout: Int, storageLevel: StorageLevel, useQuery: Boolean,
- queryLimit: Int)
+ queryLimit: Int, batchInterval: Int)
extends CloudantConfig(protocol, host, dbName, indexName, viewName)(username, password,
partitions, maxInPartition, minInPartition, requestTimeout, bulkSize, schemaSampleSize,
createDBOnSave, endpoint, useQuery, queryLimit) {
override val defaultIndex: String = endpoint
+ def getBatchInterval : Int = {
+ batchInterval
+ }
+
def getSelector : String = {
if (selector != null && !selector.isEmpty) {
selector
@@ -80,3 +84,8 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
dbUrl + "/" + JsonStoreConfigManager.ALL_DOCS_INDEX
}
}
+
+object CloudantChangesConfig {
+ // Error message from internal _changes receiver
+ var receiverErrorMsg: String = ""
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 1596133..36c2c78 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -125,9 +125,10 @@ class DefaultSource extends RelationProvider
/* Create a streaming context to handle transforming docs in
* larger databases into Spark datasets
*/
- val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(8))
-
val changesConfig = config.asInstanceOf[CloudantChangesConfig]
+ val ssc = new StreamingContext(sqlContext.sparkContext,
+ Seconds(changesConfig.getBatchInterval))
+
val changes = ssc.receiverStream(
new ChangesReceiver(changesConfig))
changes.persist(changesConfig.getStorageLevelForStreaming)
@@ -158,7 +159,11 @@ class DefaultSource extends RelationProvider
// run streaming until all docs from continuous feed are received
ssc.awaitTermination
- dataFrame.schema
+ if(dataFrame.schema.nonEmpty) {
+ dataFrame.schema
+ } else {
+ throw new CloudantException(CloudantChangesConfig.receiverErrorMsg)
+ }
}
}
CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext)
http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
index 40b4b1a..9cd495d 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
@@ -18,8 +18,8 @@ package org.apache.bahir.cloudant.common
import com.typesafe.config.ConfigFactory
-import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
+import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.bahir.cloudant.{CloudantChangesConfig, CloudantConfig}
@@ -35,6 +35,7 @@ object JsonStoreConfigManager {
private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password"
private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol"
private val CLOUDANT_API_ENDPOINT = "cloudant.endpoint"
+ private val CLOUDANT_STREAMING_BATCH_INTERVAL = "cloudant.batchInterval"
private val STORAGE_LEVEL_FOR_CHANGES_INDEX = "cloudant.storageLevel"
private val CLOUDANT_CHANGES_TIMEOUT = "cloudant.timeout"
private val USE_QUERY_CONFIG = "cloudant.useQuery"
@@ -173,6 +174,7 @@ object JsonStoreConfigManager {
implicit val storageLevel = getStorageLevel(
sparkConf, parameters, STORAGE_LEVEL_FOR_CHANGES_INDEX)
implicit val timeout = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT)
+ implicit val batchInterval = getInt(sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL)
implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
@@ -197,7 +199,7 @@ object JsonStoreConfigManager {
new CloudantChangesConfig(protocol, host, dbName, indexName,
viewName) (user, passwd, total, max, min, requestTimeout,
bulkSize, schemaSampleSize, createDBOnSave, endpoint, selector,
- timeout, storageLevel, useQuery, queryLimit)
+ timeout, storageLevel, useQuery, queryLimit, batchInterval)
} else {
throw new CloudantException(s"spark.$CLOUDANT_API_ENDPOINT parameter " +
s"is invalid. Please supply the valid option '" + ALL_DOCS_INDEX + "' or '" +
http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
index ac0aac6..323aab6 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
@@ -70,8 +70,10 @@ class ChangesReceiver(config: CloudantChangesConfig)
}
} else {
val status = headers.getOrElse("Status", IndexedSeq.empty)
- val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status(0)
+ val errorMsg = "Error retrieving _changes feed data from database " +
+ "'" + config.getDbname + "': " + status(0)
reportError(errorMsg, new CloudantException(errorMsg))
+ CloudantChangesConfig.receiverErrorMsg = errorMsg
}
})
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
index a8c8646..8495026 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
@@ -86,4 +86,20 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
assert(thrown.getMessage === s"Cloudant database name is empty. " +
s"Please supply the required value.")
}
+
+ testIfEnabled("incorrect password throws an error message for changes receiver") {
+ spark = SparkSession.builder().config(conf)
+ .config("cloudant.protocol", TestUtils.getProtocol)
+ .config("cloudant.host", TestUtils.getHost)
+ .config("cloudant.username", TestUtils.getUsername)
+ .config("cloudant.password", TestUtils.getPassword.concat("a"))
+ .config("cloudant.endpoint", "_changes")
+ .getOrCreate()
+
+ val thrown = intercept[CloudantException] {
+ spark.read.format("org.apache.bahir.cloudant").load("n_flight")
+ }
+ assert(thrown.getMessage === s"Error retrieving _changes feed data" +
+ s" from database 'n_flight': HTTP/1.1 401 Unauthorized")
+ }
}