You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/01/25 04:47:55 UTC
bahir git commit: [BAHIR-138] Fix deprecation warning messages
Repository: bahir
Updated Branches:
refs/heads/master ebdc8b257 -> 785ee1e1a
[BAHIR-138] Fix deprecation warning messages
- Imported ‘spark.implicits._’ to convert Spark RDD to Dataset
- Replaced deprecated `json(RDD[String])` with `json(Dataset[String])`
Closes #63
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/785ee1e1
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/785ee1e1
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/785ee1e1
Branch: refs/heads/master
Commit: 785ee1e1acfb129bb0524d79df3372968b9e95a7
Parents: ebdc8b2
Author: Esteban Laver <em...@us.ibm.com>
Authored: Fri Jan 12 00:26:29 2018 -0500
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jan 24 23:47:19 2018 -0500
----------------------------------------------------------------------
.../apache/bahir/cloudant/DefaultSource.scala | 243 ++++++++++---------
1 file changed, 122 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/785ee1e1/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 36c2c78..2685993 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -30,57 +30,58 @@ import org.apache.bahir.cloudant.internal.ChangesReceiver
case class CloudantReadWriteRelation (config: CloudantConfig,
schema: StructType,
dataFrame: DataFrame = null)
- (@transient val sqlContext: SQLContext)
+ (@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan with InsertableRelation {
- @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)}
+ @transient lazy val dataAccess: JsonStoreDataAccess = {new JsonStoreDataAccess(config)}
- implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
+ implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
- def buildScan(requiredColumns: Array[String],
+ import sqlContext.implicits._
+
+ def buildScan(requiredColumns: Array[String],
filters: Array[Filter]): RDD[Row] = {
- val colsLength = requiredColumns.length
+ val colsLength = requiredColumns.length
- if (dataFrame != null) {
- if (colsLength == 0) {
- dataFrame.select().rdd
- } else if (colsLength == 1) {
- dataFrame.select(requiredColumns(0)).rdd
- } else {
- val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
- dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd
- }
+ if (dataFrame != null) {
+ if (colsLength == 0) {
+ dataFrame.select().rdd
+ } else if (colsLength == 1) {
+ dataFrame.select(requiredColumns(0)).rdd
} else {
- implicit val columns : Array[String] = requiredColumns
- implicit val origFilters : Array[Filter] = filters
-
- logger.info("buildScan:" + columns + "," + origFilters)
- val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
- val df = sqlContext.read.json(cloudantRDD)
- if (colsLength > 1) {
- val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
- df.select(requiredColumns(0), colsExceptCol0: _*).rdd
- } else {
- df.rdd
- }
+ val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
+ dataFrame.select(requiredColumns(0), colsExceptCol0: _*).rdd
+ }
+ } else {
+ implicit val columns : Array[String] = requiredColumns
+ implicit val origFilters : Array[Filter] = filters
+
+ logger.info("buildScan:" + columns + "," + origFilters)
+ val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
+ val df = sqlContext.read.json(cloudantRDD.toDS())
+ if (colsLength > 1) {
+ val colsExceptCol0 = for (i <- 1 until colsLength) yield requiredColumns(i)
+ df.select(requiredColumns(0), colsExceptCol0: _*).rdd
+ } else {
+ df.rdd
}
}
-
+ }
def insert(data: DataFrame, overwrite: Boolean): Unit = {
- if (config.getCreateDBonSave) {
- dataAccess.createDB()
- }
- if (data.count() == 0) {
- logger.warn("Database " + config.getDbname +
- ": nothing was saved because the number of records was 0!")
- } else {
- val result = data.toJSON.foreachPartition { x =>
- val list = x.toList // Has to pass as List, Iterator results in 0 data
- dataAccess.saveAll(list)
- }
+ if (config.getCreateDBonSave) {
+ dataAccess.createDB()
+ }
+ if (data.count() == 0) {
+ logger.warn("Database " + config.getDbname +
+ ": nothing was saved because the number of records was 0!")
+ } else {
+ val result = data.toJSON.foreachPartition { x =>
+ val list = x.toList // Has to pass as List, Iterator results in 0 data
+ dataAccess.saveAll(list)
}
}
+ }
}
class DefaultSource extends RelationProvider
@@ -91,97 +92,97 @@ class DefaultSource extends RelationProvider
def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): CloudantReadWriteRelation = {
- create(sqlContext, parameters, null)
- }
-
- private def create(sqlContext: SQLContext,
- parameters: Map[String, String],
- inSchema: StructType) = {
-
- val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters)
-
- var dataFrame: DataFrame = null
-
- val schema: StructType = {
- if (inSchema != null) {
- inSchema
- } else if (!config.isInstanceOf[CloudantChangesConfig]
- || config.viewName != null || config.indexName != null) {
- val df = if (config.getSchemaSampleSize ==
- JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
- config.viewName == null
- && config.indexName == null) {
- val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
- dataFrame = sqlContext.read.json(cloudantRDD)
- dataFrame
- } else {
- val dataAccess = new JsonStoreDataAccess(config)
- val aRDD = sqlContext.sparkContext.parallelize(
- dataAccess.getMany(config.getSchemaSampleSize))
- sqlContext.read.json(aRDD)
- }
- df.schema
+ create(sqlContext, parameters, null)
+ }
+
+ private def create(sqlContext: SQLContext,
+ parameters: Map[String, String],
+ inSchema: StructType) = {
+
+ import sqlContext.implicits._
+
+ val config: CloudantConfig = JsonStoreConfigManager.getConfig(sqlContext, parameters)
+
+ var dataFrame: DataFrame = null
+
+ val schema: StructType = {
+ if (inSchema != null) {
+ inSchema
+ } else if (!config.isInstanceOf[CloudantChangesConfig]
+ || config.viewName != null || config.indexName != null) {
+ val df = if (config.getSchemaSampleSize ==
+ JsonStoreConfigManager.ALLDOCS_OR_CHANGES_LIMIT &&
+ config.viewName == null
+ && config.indexName == null) {
+ val cloudantRDD = new JsonStoreRDD(sqlContext.sparkContext, config)
+ dataFrame = sqlContext.read.json(cloudantRDD.toDS())
+ dataFrame
} else {
- /* Create a streaming context to handle transforming docs in
- * larger databases into Spark datasets
- */
- val changesConfig = config.asInstanceOf[CloudantChangesConfig]
- val ssc = new StreamingContext(sqlContext.sparkContext,
- Seconds(changesConfig.getBatchInterval))
-
- val changes = ssc.receiverStream(
- new ChangesReceiver(changesConfig))
- changes.persist(changesConfig.getStorageLevelForStreaming)
-
- // Global RDD that's created from union of all RDDs
- var globalRDD = ssc.sparkContext.emptyRDD[String]
-
- logger.info("Loading data from Cloudant using "
- + changesConfig.getChangesReceiverUrl)
-
- // Collect and union each RDD to convert all RDDs to a DataFrame
- changes.foreachRDD((rdd: RDD[String]) => {
- if (!rdd.isEmpty()) {
- if (globalRDD != null) {
- // Union RDDs in foreach loop
- globalRDD = globalRDD.union(rdd)
- } else {
- globalRDD = rdd
- }
+ val dataAccess = new JsonStoreDataAccess(config)
+ val aRDD = sqlContext.sparkContext.parallelize(
+ dataAccess.getMany(config.getSchemaSampleSize))
+ sqlContext.read.json(aRDD.toDS())
+ }
+ df.schema
+ } else {
+ /* Create a streaming context to handle transforming docs in
+ * larger databases into Spark datasets
+ */
+ val changesConfig = config.asInstanceOf[CloudantChangesConfig]
+ val ssc = new StreamingContext(sqlContext.sparkContext,
+ Seconds(changesConfig.getBatchInterval))
+
+ val changes = ssc.receiverStream(new ChangesReceiver(changesConfig))
+ changes.persist(changesConfig.getStorageLevelForStreaming)
+
+ // Global RDD that's created from union of all RDDs
+ var globalRDD = ssc.sparkContext.emptyRDD[String]
+
+ logger.info("Loading data from Cloudant using "
+ + changesConfig.getChangesReceiverUrl)
+
+ // Collect and union each RDD to convert all RDDs to a DataFrame
+ changes.foreachRDD((rdd: RDD[String]) => {
+ if (!rdd.isEmpty()) {
+ if (globalRDD != null) {
+ // Union RDDs in foreach loop
+ globalRDD = globalRDD.union(rdd)
} else {
- // Convert final global RDD[String] to DataFrame
- dataFrame = sqlContext.sparkSession.read.json(globalRDD)
- ssc.stop(stopSparkContext = false, stopGracefully = false)
+ globalRDD = rdd
}
- })
-
- ssc.start
- // run streaming until all docs from continuous feed are received
- ssc.awaitTermination
-
- if(dataFrame.schema.nonEmpty) {
- dataFrame.schema
} else {
- throw new CloudantException(CloudantChangesConfig.receiverErrorMsg)
+ // Convert final global RDD[String] to DataFrame
+ dataFrame = sqlContext.sparkSession.read.json(globalRDD.toDS())
+ ssc.stop(stopSparkContext = false, stopGracefully = false)
}
+ })
+
+ ssc.start
+ // run streaming until all docs from continuous feed are received
+ ssc.awaitTermination
+
+ if(dataFrame.schema.nonEmpty) {
+ dataFrame.schema
+ } else {
+ throw new CloudantException(CloudantChangesConfig.receiverErrorMsg)
}
}
- CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext)
- }
-
- def createRelation(sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): CloudantReadWriteRelation = {
- val relation = create(sqlContext, parameters, data.schema)
- relation.insert(data, mode==SaveMode.Overwrite)
- relation
}
+ CloudantReadWriteRelation(config, schema, dataFrame)(sqlContext)
+ }
- def createRelation(sqlContext: SQLContext,
- parameters: Map[String, String],
- schema: StructType): CloudantReadWriteRelation = {
- create(sqlContext, parameters, schema)
- }
+ def createRelation(sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ data: DataFrame): CloudantReadWriteRelation = {
+ val relation = create(sqlContext, parameters, data.schema)
+ relation.insert(data, mode==SaveMode.Overwrite)
+ relation
+ }
+ def createRelation(sqlContext: SQLContext,
+ parameters: Map[String, String],
+ schema: StructType): CloudantReadWriteRelation = {
+ create(sqlContext, parameters, schema)
+ }
}