You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/01/25 04:47:55 UTC

bahir git commit: [BAHIR-138] Fix deprecation warning messages

Repository: bahir
Updated Branches:
  refs/heads/master ebdc8b257 -> 785ee1e1a


[BAHIR-138] Fix deprecation warning messages

- Imported ‘spark.implicits._’ to convert Spark RDD to Dataset
- Replaced deprecated `json(RDD[String])` with `json(Dataset[String])`

Closes #63


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/785ee1e1
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/785ee1e1
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/785ee1e1

Branch: refs/heads/master
Commit: 785ee1e1acfb129bb0524d79df3372968b9e95a7
Parents: ebdc8b2
Author: Esteban Laver <em...@us.ibm.com>
Authored: Fri Jan 12 00:26:29 2018 -0500
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jan 24 23:47:19 2018 -0500

----------------------------------------------------------------------
 .../apache/bahir/cloudant/DefaultSource.scala   | 243 ++++++++++---------
 1 file changed, 122 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/785ee1e1/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 36c2c78..2685993 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -30,57 +30,58 @@ import org.apache.bahir.cloudant.internal.ChangesReceiver
 case class CloudantReadWriteRelation (config: CloudantConfig,
                                       schema: StructType,
                                       dataFrame: DataFrame = null)
-                      (@transient val sqlContext: SQLContext)
+                                     (@transient val sqlContext: SQLContext)
   extends BaseRelation with PrunedFilteredScan  with InsertableRelation {
 
-   @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)}
+  @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)}
 
-    implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
+  implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
 
-    def buildScan(requiredColumns: Array[String],
+  import sqlContext.implicits._
+
+  def buildScan(requiredColumns: Array[String],
                 filters: Array[Filter]): RDD[Row] = {
-      val colsLength = requiredColumns.length
+    val colsLength = requiredColumns.length
 
-      if (dataFrame != null) {
-        if (colsLength == 0) {
-          dataFrame.select().rdd
-        } else if (colsLength == 1) {
-          dataFrame.select(requiredColumns(0)).rdd
-        } else {
-          val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
-          dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd
-        }
+    if (dataFrame != null) {
+      if (colsLength == 0) {
+        dataFrame.select().rdd
+      } else if (colsLength == 1) {
+        dataFrame.select(requiredColumns(0)).rdd
       } else {
-        implicit val columns : Array[String] = requiredColumns
-        implicit val origFilters : Array[Filter] = filters
-
-        logger.info("buildScan:" + columns + "," + origFilters)
-        val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
-        val df = sqlContext.read.json(cloudantRDD)
-        if (colsLength > 1) {
-          val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
-          df.select(requiredColumns(0), colsExceptCol0: _*).rdd
-        } else {
-          df.rdd
-        }
+        val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
+        dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd
+      }
+    } else {
+      implicit val columns : Array[String] = requiredColumns
+      implicit val origFilters : Array[Filter] = filters
+
+      logger.info("buildScan:" + columns + "," + origFilters)
+      val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
+      val df = sqlContext.read.json(cloudantRDD.toDS())
+      if (colsLength > 1) {
+        val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
+        df.select(requiredColumns(0), colsExceptCol0: _*).rdd
+      } else {
+        df.rdd
       }
     }
-
+  }
 
   def insert(data: DataFrame, overwrite: Boolean): Unit = {
-      if (config.getCreateDBonSave) {
-        dataAccess.createDB()
-      }
-      if (data.count() == 0) {
-        logger.warn("Database " + config.getDbname +
-          ": nothing was saved because the number of records was 0!")
-      } else {
-        val result = data.toJSON.foreachPartition { x =>
-          val list = x.toList // Has to pass as List, Iterator results in 0 data
-          dataAccess.saveAll(list)
-        }
+    if (config.getCreateDBonSave) {
+      dataAccess.createDB()
+    }
+    if (data.count() == 0) {
+      logger.warn("Database " + config.getDbname +
+        ": nothing was saved because the number of records was 0!")
+    } else {
+      val result = data.toJSON.foreachPartition { x =>
+        val list = x.toList // Has to pass as List, Iterator results in 0 data
+        dataAccess.saveAll(list)
       }
     }
+  }
 }
 
 class DefaultSource extends RelationProvider
@@ -91,97 +92,97 @@ class DefaultSource extends RelationProvider
 
   def createRelation(sqlContext: SQLContext,
                      parameters: Map[String, String]): CloudantReadWriteRelation = {
-      create(sqlContext, parameters, null)
-    }
-
-    private def create(sqlContext: SQLContext,
-                       parameters: Map[String, String],
-                       inSchema: StructType) = {
-
-      val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters)
-
-      var dataFrame: DataFrame = null
-
-      val schema: StructType = {
-        if (inSchema != null) {
-          inSchema
-        } else if (!config.isInstanceOf[CloudantChangesConfig]
-          || config.viewName != null || config.indexName != null) {
-          val df = if (config.getSchemaSampleSize ==
-            JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
-            config.viewName == null
-            && config.indexName == null) {
-            val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
-            dataFrame = sqlContext.read.json(cloudantRDD)
-            dataFrame
-          } else {
-            val dataAccess = new JsonStoreDataAccess(config)
-            val aRDD = sqlContext.sparkContext.parallelize(
-                dataAccess.getMany(config.getSchemaSampleSize))
-            sqlContext.read.json(aRDD)
-          }
-          df.schema
+    create(sqlContext, parameters, null)
+  }
+
+  private def create(sqlContext: SQLContext,
+                     parameters: Map[String, String],
+                     inSchema: StructType) = {
+
+    import sqlContext.implicits._
+
+    val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters)
+
+    var dataFrame: DataFrame = null
+
+    val schema: StructType = {
+      if (inSchema != null) {
+        inSchema
+      } else if (!config.isInstanceOf[CloudantChangesConfig]
+        || config.viewName != null || config.indexName != null) {
+        val df = if (config.getSchemaSampleSize ==
+          JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
+          config.viewName == null
+          && config.indexName == null) {
+          val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
+          dataFrame = sqlContext.read.json(cloudantRDD.toDS())
+          dataFrame
         } else {
-          /* Create a streaming context to handle transforming docs in
-          * larger databases into Spark datasets
-          */
-          val changesConfig = config.asInstanceOf[CloudantChangesConfig]
-          val ssc = new StreamingContext(sqlContext.sparkContext,
-            Seconds(changesConfig.getBatchInterval))
-
-          val changes = ssc.receiverStream(
-            new ChangesReceiver(changesConfig))
-          changes.persist(changesConfig.getStorageLevelForStreaming)
-
-          // Global RDD that's created from union of all RDDs
-          var globalRDD = ssc.sparkContext.emptyRDD[String]
-
-          logger.info("Loading data from Cloudant using "
-            + changesConfig.getChangesReceiverUrl)
-
-          // Collect and union each RDD to convert all RDDs to a DataFrame
-          changes.foreachRDD((rdd: RDD[String]) => {
-            if (!rdd.isEmpty()) {
-              if (globalRDD != null) {
-                // Union RDDs in foreach loop
-                globalRDD = globalRDD.union(rdd)
-              } else {
-                globalRDD = rdd
-              }
+          val dataAccess = new JsonStoreDataAccess(config)
+          val aRDD = sqlContext.sparkContext.parallelize(
+            dataAccess.getMany(config.getSchemaSampleSize))
+          sqlContext.read.json(aRDD.toDS())
+        }
+        df.schema
+      } else {
+        /* Create a streaming context to handle transforming docs in
+        * larger databases into Spark datasets
+        */
+        val changesConfig = config.asInstanceOf[CloudantChangesConfig]
+        val ssc = new StreamingContext(sqlContext.sparkContext,
+          Seconds(changesConfig.getBatchInterval))
+
+        val changes = ssc.receiverStream(new ChangesReceiver(changesConfig))
+        changes.persist(changesConfig.getStorageLevelForStreaming)
+
+        // Global RDD that's created from union of all RDDs
+        var globalRDD = ssc.sparkContext.emptyRDD[String]
+
+        logger.info("Loading data from Cloudant using "
+          + changesConfig.getChangesReceiverUrl)
+
+        // Collect and union each RDD to convert all RDDs to a DataFrame
+        changes.foreachRDD((rdd: RDD[String]) => {
+          if (!rdd.isEmpty()) {
+            if (globalRDD != null) {
+              // Union RDDs in foreach loop
+              globalRDD = globalRDD.union(rdd)
             } else {
-              // Convert final global RDD[String] to DataFrame
-              dataFrame = sqlContext.sparkSession.read.json(globalRDD)
-              ssc.stop(stopSparkContext = false, stopGracefully = false)
+              globalRDD = rdd
             }
-          })
-
-          ssc.start
-          // run streaming until all docs from continuous feed are received
-          ssc.awaitTermination
-
-          if(dataFrame.schema.nonEmpty) {
-            dataFrame.schema
           } else {
-            throw new CloudantException(CloudantChangesConfig.receiverErrorMsg)
+            // Convert final global RDD[String] to DataFrame
+            dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS())
+            ssc.stop(stopSparkContext = false, stopGracefully = false)
           }
+        })
+
+        ssc.start
+        // run streaming until all docs from continuous feed are received
+        ssc.awaitTermination
+
+        if(dataFrame.schema.nonEmpty) {
+          dataFrame.schema
+        } else {
+          throw new CloudantException(CloudantChangesConfig.receiverErrorMsg)
         }
       }
-      CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext)
-    }
-
-    def createRelation(sqlContext: SQLContext,
-                       mode: SaveMode,
-                       parameters: Map[String, String],
-                       data: DataFrame): CloudantReadWriteRelation = {
-      val relation = create(sqlContext, parameters, data.schema)
-      relation.insert(data, mode==SaveMode.Overwrite)
-      relation
     }
+    CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext)
+  }
 
-    def createRelation(sqlContext: SQLContext,
-                       parameters: Map[String, String],
-                       schema: StructType): CloudantReadWriteRelation = {
-      create(sqlContext, parameters, schema)
-    }
+  def createRelation(sqlContext: SQLContext,
+                     mode: SaveMode,
+                     parameters: Map[String, String],
+                     data: DataFrame): CloudantReadWriteRelation = {
+    val relation = create(sqlContext, parameters, data.schema)
+    relation.insert(data, mode==SaveMode.Overwrite)
+    relation
+  }
 
+  def createRelation(sqlContext: SQLContext,
+                     parameters: Map[String, String],
+                     schema: StructType): CloudantReadWriteRelation = {
+    create(sqlContext, parameters, schema)
+  }
 }