You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/23 17:54:20 UTC
[kudu] 06/08: [examples] a small update on SparkExample
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 7f1d02f90ba8236f441ad8fe2fa63f07ed3c200d
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Jan 9 19:38:32 2019 -0800
[examples] a small update on SparkExample
Updated the Spark example:
* added replication factor as a parameter for the application
(by default set to 1)
* don't try to drop the test table if it hasn't been created
* catch and log the information about caught exceptions
* other petty changes
Change-Id: Icf8c4e675ca6c240582242658fa8173a1ccd271d
Reviewed-on: http://gerrit.cloudera.org:8080/12208
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Will Berkeley <wd...@gmail.com>
---
.../apache/kudu/spark/examples/SparkExample.scala | 71 ++++++++++++++++------
1 file changed, 54 insertions(+), 17 deletions(-)
diff --git a/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
index 27b7fd5..9a30740 100644
--- a/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
+++ b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
@@ -29,8 +29,26 @@ import org.apache.spark.sql.Row
object SparkExample {
- val kuduMasters: String = System.getProperty("kuduMasters", "localhost:7051") // Kudu master address list.
- val tableName: String = System.getProperty("tableName", "spark_test") // Name of table you wish to create.
+ // The list of RPC endpoints of Kudu masters in the cluster,
+ // separated by comma. The default value assumes the following:
+ // * the cluster runs a single Kudu master
+ // * the Kudu master runs at the default RPC port
+ // * the spark-submit is run at the Kudu master's node
+ val kuduMasters: String = System.getProperty("kuduMasters", "localhost:7051")
+
+ // The name of a table to create.
+ val tableName: String = System.getProperty("tableName", "kudu_spark_example")
+
+ // The replication factor of the table to create. Make sure at least this
+ // number of tablet servers are available when running this example app.
+ // Replication factors of 1, 3, 5, 7 are available out-of-the-box.
+ // The default value of 1 is chosen to provide maximum environment
+ // compatibility, but it's good only for small toy examples like this.
+ // For real-world scenarios the replication factor of 1 (i.e. keeping table's
+ // data non-replicated) is a bad idea in general: consider the replication
+ // factor of 3 and higher.
+ val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas", 1)
+
val nameCol = "name"
val idCol = "id"
@@ -45,7 +63,8 @@ object SparkExample {
def main(args: Array[String]) {
// Define our session and context variables for use throughout the program.
// The kuduContext is a serializable container for Kudu client connections,
- // while the SparkSession is the entry point to SparkSQL and the Dataset/DataFrame API.
+ // while the SparkSession is the entry point to SparkSQL and
+ // the Dataset/DataFrame API.
val spark = SparkSession.builder.appName("KuduSparkExample").getOrCreate()
val kuduContext = new KuduContext(kuduMasters, spark.sqlContext.sparkContext)
@@ -61,30 +80,42 @@ object SparkExample {
)
)
+ var tableIsCreated = false
try {
- // Create the table, with 3 partitions (tablets) and default number of replicas,
- // as set by the Kudu service configuration.
- if (!kuduContext.tableExists(tableName)) {
- kuduContext.createTable(tableName, schema, Seq(idCol),
- new CreateTableOptions().addHashPartitions(List(idCol).asJava, 3))
+ // Make sure the table does not exist. This is mostly to demonstrate
+ // the capabilities of the API. In general, there might be a racing
+ // request to create the table coming from elsewhere, so even
+ // if tableExists() returned false at this time, the table might appear
+ // while createTable() is running below. In the latter case, appropriate
+ // Kudu exception will be thrown by createTable().
+ if (kuduContext.tableExists(tableName)) {
+ throw new RuntimeException(tableName + ": table already exists")
}
+ // Create the table with 3 hash partitions, resulting in 3 tablets,
+ // each with the specified number of replicas.
+ kuduContext.createTable(tableName, schema, Seq(idCol),
+ new CreateTableOptions()
+ .addHashPartitions(List(idCol).asJava, 3)
+ .setNumReplicas(tableNumReplicas))
+ tableIsCreated = true
+
// Write to the table.
- logger.info(s"Writing to table '$tableName'")
+ logger.info(s"writing to table '$tableName'")
val data = Array(User("userA", 1234), User("userB", 5678))
val userRDD = spark.sparkContext.parallelize(data)
val userDF = userRDD.toDF()
kuduContext.insertRows(userDF, tableName)
// Read from the table using an RDD.
- logger.info("Reading back the rows just written")
+ logger.info(s"reading back the rows just written")
val readCols = Seq(nameCol, idCol)
val readRDD = kuduContext.kuduRDD(spark.sparkContext, tableName, readCols)
val userTuple = readRDD.map { case Row(name: String, id: Int) => (name, id) }
userTuple.collect().foreach(println(_))
// Upsert some rows.
- logger.info(s"Upserting to table '$tableName'")
+ logger.info(s"upserting to table '$tableName'")
val upsertUsers = Array(User("newUserA", 1234), User("userC", 7777))
val upsertUsersRDD = spark.sparkContext.parallelize(upsertUsers)
val upsertUsersDF = upsertUsersRDD.toDF()
@@ -94,13 +125,19 @@ object SparkExample {
val sqlDF = spark.sqlContext.read.options(
Map("kudu.master" -> kuduMasters, "kudu.table" -> tableName)).kudu
sqlDF.createOrReplaceTempView(tableName)
- spark.sqlContext.sql(s"select * from $tableName where $idCol > 1000").show
- }
-
- finally {
+ spark.sqlContext.sql(s"SELECT * FROM $tableName WHERE $idCol > 1000").show
+ } catch {
+ // Catch, log and re-throw. Not the best practice, but this is a very
+ // simplistic example.
+ case unknown : Throwable => logger.error(s"got an exception: " + unknown)
+ throw unknown
+ } finally {
// Clean up.
- logger.info(s"Deleting table '$tableName' and closing down the session")
- kuduContext.deleteTable(tableName)
+ if (tableIsCreated) {
+ logger.info(s"deleting table '$tableName'")
+ kuduContext.deleteTable(tableName)
+ }
+ logger.info(s"closing down the session")
spark.close()
}
}