You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/01/25 01:24:06 UTC

bahir git commit: [BAHIR-137] CouchDB/Cloudant _changes feed receiver improvements

Repository: bahir
Updated Branches:
  refs/heads/master 770b2916f -> ebdc8b257


[BAHIR-137] CouchDB/Cloudant _changes feed receiver improvements

Adds batchInterval option for tuning _changes receiver’s streaming batch interval
Throw a CloudantException if the final schema for the _changes receiver is empty
Call stop method in streaming receiver when there’s an error

Closes #60


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/ebdc8b25
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/ebdc8b25
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/ebdc8b25

Branch: refs/heads/master
Commit: ebdc8b257d32ff64a88657cc3e7dc838564a1d01
Parents: 770b291
Author: Esteban Laver <em...@us.ibm.com>
Authored: Mon Oct 2 11:09:28 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jan 24 20:03:20 2018 -0500

----------------------------------------------------------------------
 sql-cloudant/README.md                              |  1 +
 sql-cloudant/src/main/resources/application.conf    |  1 +
 .../bahir/cloudant/CloudantChangesConfig.scala      | 11 ++++++++++-
 .../org/apache/bahir/cloudant/DefaultSource.scala   | 11 ++++++++---
 .../cloudant/common/JsonStoreConfigManager.scala    |  6 ++++--
 .../bahir/cloudant/internal/ChangesReceiver.scala   |  4 +++-
 .../apache/bahir/cloudant/CloudantOptionSuite.scala | 16 ++++++++++++++++
 7 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index 18118eb..160e279 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -57,6 +57,7 @@ Default values are defined in [here](src/main/resources/application.conf).
 
 Name | Default | Meaning
 --- |:---:| ---
+cloudant.batchInterval|8|number of seconds to set for streaming all documents from `_changes` endpoint into Spark dataframe.  See [Setting the right batch interval](https://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval) for tuning this value.
 cloudant.endpoint|`_all_docs`|endpoint for RelationProvider when loading data from Cloudant to DataFrames or SQL temporary tables. Select between the Cloudant `_all_docs` or `_changes` API endpoint.  See **Note** below for differences between endpoints.
 cloudant.protocol|https|protocol to use to transfer data: http or https
 cloudant.host| |cloudant host url

http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/resources/application.conf b/sql-cloudant/src/main/resources/application.conf
index 62497e2..6ff4139 100644
--- a/sql-cloudant/src/main/resources/application.conf
+++ b/sql-cloudant/src/main/resources/application.conf
@@ -9,6 +9,7 @@ spark-sql {
         requestTimeout = 900000
     }
     cloudant = {
+        batchInterval = 8
         endpoint = "_all_docs"
         protocol = https
         useQuery = false

http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
index 0e70b95..9f2a7ba 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantChangesConfig.scala
@@ -27,13 +27,17 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
                             bulkSize: Int, schemaSampleSize: Int,
                             createDBOnSave: Boolean, endpoint: String, selector: String,
                             timeout: Int, storageLevel: StorageLevel, useQuery: Boolean,
-                            queryLimit: Int)
+                            queryLimit: Int, batchInterval: Int)
   extends CloudantConfig(protocol, host, dbName, indexName, viewName)(username, password,
     partitions, maxInPartition, minInPartition, requestTimeout, bulkSize, schemaSampleSize,
     createDBOnSave, endpoint, useQuery, queryLimit) {
 
   override val defaultIndex: String = endpoint
 
+  def getBatchInterval : Int = {
+    batchInterval
+  }
+
   def getSelector : String = {
     if (selector != null && !selector.isEmpty) {
       selector
@@ -80,3 +84,8 @@ class CloudantChangesConfig(protocol: String, host: String, dbName: String,
     dbUrl + "/" + JsonStoreConfigManager.ALL_DOCS_INDEX
   }
 }
+
+object CloudantChangesConfig {
+  // Error message from internal _changes receiver
+  var receiverErrorMsg: String = ""
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 1596133..36c2c78 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -125,9 +125,10 @@ class DefaultSource extends RelationProvider
           /* Create a streaming context to handle transforming docs in
           * larger databases into Spark datasets
           */
-          val ssc = new StreamingContext(sqlContext.sparkContext, Seconds(8))
-
           val changesConfig = config.asInstanceOf[CloudantChangesConfig]
+          val ssc = new StreamingContext(sqlContext.sparkContext,
+            Seconds(changesConfig.getBatchInterval))
+
           val changes = ssc.receiverStream(
             new ChangesReceiver(changesConfig))
           changes.persist(changesConfig.getStorageLevelForStreaming)
@@ -158,7 +159,11 @@ class DefaultSource extends RelationProvider
           // run streaming until all docs from continuous feed are received
           ssc.awaitTermination
 
-          dataFrame.schema
+          if(dataFrame.schema.nonEmpty) {
+            dataFrame.schema
+          } else {
+            throw new CloudantException(CloudantChangesConfig.receiverErrorMsg)
+          }
         }
       }
       CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext)

http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
index 40b4b1a..9cd495d 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/common/JsonStoreConfigManager.scala
@@ -18,8 +18,8 @@ package org.apache.bahir.cloudant.common
 
 import com.typesafe.config.ConfigFactory
 
-import org.apache.spark.sql.SQLContext
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.bahir.cloudant.{CloudantChangesConfig, CloudantConfig}
@@ -35,6 +35,7 @@ object JsonStoreConfigManager {
   private val CLOUDANT_PASSWORD_CONFIG = "cloudant.password"
   private val CLOUDANT_PROTOCOL_CONFIG = "cloudant.protocol"
   private val CLOUDANT_API_ENDPOINT = "cloudant.endpoint"
+  private val CLOUDANT_STREAMING_BATCH_INTERVAL = "cloudant.batchInterval"
   private val STORAGE_LEVEL_FOR_CHANGES_INDEX = "cloudant.storageLevel"
   private val CLOUDANT_CHANGES_TIMEOUT = "cloudant.timeout"
   private val USE_QUERY_CONFIG = "cloudant.useQuery"
@@ -173,6 +174,7 @@ object JsonStoreConfigManager {
     implicit val storageLevel = getStorageLevel(
       sparkConf, parameters, STORAGE_LEVEL_FOR_CHANGES_INDEX)
     implicit val timeout = getInt(sparkConf, parameters, CLOUDANT_CHANGES_TIMEOUT)
+    implicit val batchInterval = getInt(sparkConf, parameters, CLOUDANT_STREAMING_BATCH_INTERVAL)
 
     implicit val useQuery = getBool(sparkConf, parameters, USE_QUERY_CONFIG)
     implicit val queryLimit = getInt(sparkConf, parameters, QUERY_LIMIT_CONFIG)
@@ -197,7 +199,7 @@ object JsonStoreConfigManager {
       new CloudantChangesConfig(protocol, host, dbName, indexName,
         viewName) (user, passwd, total, max, min, requestTimeout,
         bulkSize, schemaSampleSize, createDBOnSave, endpoint, selector,
-        timeout, storageLevel, useQuery, queryLimit)
+        timeout, storageLevel, useQuery, queryLimit, batchInterval)
     } else {
       throw new CloudantException(s"spark.$CLOUDANT_API_ENDPOINT parameter " +
         s"is invalid. Please supply the valid option '" + ALL_DOCS_INDEX + "' or '" +

http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
index ac0aac6..323aab6 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/internal/ChangesReceiver.scala
@@ -70,8 +70,10 @@ class ChangesReceiver(config: CloudantChangesConfig)
         }
       } else {
         val status = headers.getOrElse("Status", IndexedSeq.empty)
-        val errorMsg = "Error retrieving _changes feed " + config.getDbname + ": " + status(0)
+        val errorMsg = "Error retrieving _changes feed data from database " +
+          "'" + config.getDbname + "': " + status(0)
         reportError(errorMsg, new CloudantException(errorMsg))
+        CloudantChangesConfig.receiverErrorMsg = errorMsg
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/bahir/blob/ebdc8b25/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
index a8c8646..8495026 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
@@ -86,4 +86,20 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
     assert(thrown.getMessage === s"Cloudant database name is empty. " +
       s"Please supply the required value.")
   }
+
+  testIfEnabled("incorrect password throws an error message for changes receiver") {
+    spark = SparkSession.builder().config(conf)
+      .config("cloudant.protocol", TestUtils.getProtocol)
+      .config("cloudant.host", TestUtils.getHost)
+      .config("cloudant.username", TestUtils.getUsername)
+      .config("cloudant.password", TestUtils.getPassword.concat("a"))
+      .config("cloudant.endpoint", "_changes")
+      .getOrCreate()
+
+    val thrown = intercept[CloudantException] {
+      spark.read.format("org.apache.bahir.cloudant").load("n_flight")
+    }
+    assert(thrown.getMessage === s"Error retrieving _changes feed data" +
+      s" from database 'n_flight': HTTP/1.1 401 Unauthorized")
+  }
 }