You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/12/21 00:29:56 UTC

bahir git commit: [BAHIR-138] fix deprecated warnings in sql-cloudant

Repository: bahir
Updated Branches:
  refs/heads/master 0e1505a89 -> eae02f29e


[BAHIR-138] fix deprecated warnings in sql-cloudant

Fix warnings in DefaultSource class, and in CloudantStreaming
and CloudantStreamingSelector examples.

How

Imported spark.implicits._ to convert Spark RDD to Dataset
Replaced deprecated json(RDD[String]) with json(Dataset[String])
Improved streaming examples:

Replaced registerTempTable with preferred createOrReplaceTempView
Replaced !isEmpty with nonEmpty
Use an accessible sales database so users can run the example without any setup
Fixed error message when stopping tests by adding logic to streaming
receiver to not store documents in Spark memory when stream has stopped

Closes #59


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/eae02f29
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/eae02f29
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/eae02f29

Branch: refs/heads/master
Commit: eae02f29eb011f50bc313714e6cde62ce65804c4
Parents: 0e1505a
Author: Esteban Laver <em...@us.ibm.com>
Authored: Mon Oct 2 16:18:40 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Dec 20 08:49:02 2017 -0800

----------------------------------------------------------------------
 .../sql/cloudant/CloudantStreaming.scala        | 73 ++++++++------------
 .../cloudant/CloudantStreamingSelector.scala    | 26 ++++---
 .../bahir/cloudant/CloudantReceiver.scala       |  4 +-
 3 files changed, 47 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/eae02f29/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
index a1de696..df00756 100644
--- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
+++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreaming.scala
@@ -14,86 +14,71 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.spark.examples.sql.cloudant
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
-import org.apache.spark.streaming.scheduler.{ StreamingListener, StreamingListenerReceiverError}
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
 
 import org.apache.bahir.cloudant.CloudantReceiver
 
 object CloudantStreaming {
   def main(args: Array[String]) {
-    val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala")
+    val spark = SparkSession.builder()
+      .appName("Cloudant Spark SQL External Datasource in Scala")
+      .master("local[*]")
+      .getOrCreate()
+
     // Create the context with a 10 seconds batch size
-    val ssc = new StreamingContext(sparkConf, Seconds(10))
+    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
+    import spark.implicits._
 
-    val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map(
-      "cloudant.host" -> "ACCOUNT.cloudant.com",
-      "cloudant.username" -> "USERNAME",
-      "cloudant.password" -> "PASSWORD",
-      "database" -> "n_airportcodemapping")))
+    val changes = ssc.receiverStream(new CloudantReceiver(spark.sparkContext.getConf, Map(
+      "cloudant.host" -> "examples.cloudant.com",
+      "database" -> "sales")))
 
     changes.foreachRDD((rdd: RDD[String], time: Time) => {
       // Get the singleton instance of SparkSession
-      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+
 
       println(s"========= $time =========")// scalastyle:ignore
-      // Convert RDD[String] to DataFrame
-      val changesDataFrame = spark.read.json(rdd)
-      if (!changesDataFrame.schema.isEmpty) {
+      // Convert RDD[String] to Dataset[String]
+      val changesDataFrame = spark.read.json(rdd.toDS())
+      if (changesDataFrame.schema.nonEmpty) {
         changesDataFrame.printSchema()
-        changesDataFrame.select("*").show()
 
         var hasDelRecord = false
-        var hasAirportNameField = false
+        var hasMonth = false
         for (field <- changesDataFrame.schema.fieldNames) {
           if ("_deleted".equals(field)) {
             hasDelRecord = true
           }
-          if ("airportName".equals(field)) {
-            hasAirportNameField = true
+          if ("month".equals(field)) {
+            hasMonth = true
           }
         }
         if (hasDelRecord) {
           changesDataFrame.filter(changesDataFrame("_deleted")).select("*").show()
         }
 
-        if (hasAirportNameField) {
-          changesDataFrame.filter(changesDataFrame("airportName") >= "Paris").select("*").show()
-          changesDataFrame.registerTempTable("airportcodemapping")
-          val airportCountsDataFrame =
+        if (hasMonth) {
+          changesDataFrame.filter(changesDataFrame("month") === "May").select("*").show(5)
+          changesDataFrame.createOrReplaceTempView("sales")
+          val salesInMayCountsDataFrame =
             spark.sql(
-                s"""
-                |select airportName, count(*) as total
-                |from airportcodemapping
-                |group by airportName
+              s"""
+                 |select rep, amount
+                 |from sales
+                 |where month = "May"
                 """.stripMargin)
-          airportCountsDataFrame.show()
+          salesInMayCountsDataFrame.show(5)
         }
       }
 
     })
     ssc.start()
-    // run streaming for 120 secs
-    Thread.sleep(120000L)
+    // run streaming for 60 secs
+    Thread.sleep(60000L)
     ssc.stop(true)
   }
 }
-
-/** Lazily instantiated singleton instance of SparkSession */
-object SparkSessionSingleton {
-  @transient  private var instance: SparkSession = _
-  def getInstance(sparkConf: SparkConf): SparkSession = {
-    if (instance == null) {
-      instance = SparkSession
-        .builder
-        .config(sparkConf)
-        .getOrCreate()
-    }
-    instance
-  }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/eae02f29/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
index 51d939a..05eca9b 100644
--- a/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
+++ b/sql-cloudant/examples/src/main/scala/org/apache/spark/examples/sql/cloudant/CloudantStreamingSelector.scala
@@ -20,7 +20,6 @@ package org.apache.spark.examples.sql.cloudant
 import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
 
@@ -28,34 +27,39 @@ import org.apache.bahir.cloudant.CloudantReceiver
 
 object CloudantStreamingSelector {
   def main(args: Array[String]) {
-    val sparkConf = new SparkConf().setAppName("Cloudant Spark SQL External Datasource in Scala")
+    val spark = SparkSession.builder()
+      .appName("Cloudant Spark SQL External Datasource in Scala")
+      .master("local[*]")
+      .getOrCreate()
+
+    import spark.implicits._
 
     // Create the context with a 10 seconds batch size
-    val ssc = new StreamingContext(sparkConf, Seconds(10))
+    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
     val curTotalAmount = new AtomicLong(0)
     val curSalesCount = new AtomicLong(0)
     var batchAmount = 0L
 
-    val changes = ssc.receiverStream(new CloudantReceiver(sparkConf, Map(
-      "cloudant.host" -> "ACCOUNT.cloudant.com",
-      "cloudant.username" -> "USERNAME",
-      "cloudant.password" -> "PASSWORD",
+    val changes = ssc.receiverStream(new CloudantReceiver(spark.sparkContext.getConf, Map(
+      "cloudant.host" -> "examples.cloudant.com",
       "database" -> "sales",
       "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}")))
 
     changes.foreachRDD((rdd: RDD[String], time: Time) => {
       // Get the singleton instance of SQLContext
-      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+
       println(s"========= $time =========") // scalastyle:ignore
-      val changesDataFrame = spark.read.json(rdd)
-      if (!changesDataFrame.schema.isEmpty) {
+      val changesDataFrame = spark.read.json(rdd.toDS())
+      if (changesDataFrame.schema.nonEmpty) {
         changesDataFrame.select("*").show()
         batchAmount = changesDataFrame.groupBy().sum("amount").collect()(0).getLong(0)
         curSalesCount.getAndAdd(changesDataFrame.count())
         curTotalAmount.getAndAdd(batchAmount)
         println("Current sales count:" + curSalesCount)// scalastyle:ignore
         println("Current total amount:" + curTotalAmount)// scalastyle:ignore
-        }
+      } else {
+        ssc.stop()
+      }
     })
 
     ssc.start()

http://git-wip-us.apache.org/repos/asf/bahir/blob/eae02f29/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
index c6bae2e..60a7d4a 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/CloudantReceiver.scala
@@ -74,7 +74,9 @@ class CloudantReceiver(sparkConf: SparkConf, cloudantParams: Map[String, String]
             var doc = ""
             if(jsonDoc != null) {
               doc = Json.stringify(jsonDoc)
-              store(doc)
+              if(!isStopped() && doc.nonEmpty) {
+                store(doc)
+              }
             }
           }
         })