You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/23 17:54:20 UTC

[kudu] 06/08: [examples] a small update on SparkExample

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7f1d02f90ba8236f441ad8fe2fa63f07ed3c200d
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Jan 9 19:38:32 2019 -0800

    [examples] a small update on SparkExample
    
    Updated the Spark example:
      * added replication factor as a parameter for the application
        (by default set to 1)
      * don't try to drop the test table if it hasn't been created
      * catch and log the information about caught exceptions
      * other petty changes
    
    Change-Id: Icf8c4e675ca6c240582242658fa8173a1ccd271d
    Reviewed-on: http://gerrit.cloudera.org:8080/12208
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Will Berkeley <wd...@gmail.com>
---
 .../apache/kudu/spark/examples/SparkExample.scala  | 71 ++++++++++++++++------
 1 file changed, 54 insertions(+), 17 deletions(-)

diff --git a/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
index 27b7fd5..9a30740 100644
--- a/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
+++ b/examples/scala/spark-example/src/main/scala/org/apache/kudu/spark/examples/SparkExample.scala
@@ -29,8 +29,26 @@ import org.apache.spark.sql.Row
 
 object SparkExample {
 
-  val kuduMasters: String = System.getProperty("kuduMasters", "localhost:7051")   // Kudu master address list.
-  val tableName: String = System.getProperty("tableName", "spark_test") // Name of table you wish to create.
+  // The list of RPC endpoints of Kudu masters in the cluster,
+  // separated by comma. The default value assumes the following:
+  //   * the cluster runs a single Kudu master
+  //   * the Kudu master runs at the default RPC port
+  //   * the spark-submit is run at the Kudu master's node
+  val kuduMasters: String = System.getProperty("kuduMasters", "localhost:7051")
+
+  // The name of a table to create.
+  val tableName: String = System.getProperty("tableName", "kudu_spark_example")
+
+  // The replication factor of the table to create. Make sure at least this
+  // number of tablet servers are available when running this example app.
+  // Replication factors of 1, 3, 5, 7 are available out-of-the-box.
+  // The default value of 1 is chosen to provide maximum environment
+  // compatibility, but it's good only for small toy examples like this.
+  // For real-world scenarios the replication factor of 1 (i.e. keeping table's
+  // data non-replicated) is a bad idea in general: consider the replication
+  // factor of 3 and higher.
+  val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas", 1)
+
   val nameCol = "name"
   val idCol = "id"
 
@@ -45,7 +63,8 @@ object SparkExample {
   def main(args: Array[String]) {
     // Define our session and context variables for use throughout the program.
     // The kuduContext is a serializable container for Kudu client connections,
-    // while the SparkSession is the entry point to SparkSQL and the Dataset/DataFrame API.
+    // while the SparkSession is the entry point to SparkSQL and
+    // the Dataset/DataFrame API.
     val spark = SparkSession.builder.appName("KuduSparkExample").getOrCreate()
     val kuduContext = new KuduContext(kuduMasters, spark.sqlContext.sparkContext)
 
@@ -61,30 +80,42 @@ object SparkExample {
                        )
                   )
 
+    var tableIsCreated = false
     try {
-      // Create the table, with 3 partitions (tablets) and default number of replicas,
-      // as set by the Kudu service configuration.
-      if (!kuduContext.tableExists(tableName)) {
-        kuduContext.createTable(tableName, schema, Seq(idCol),
-          new CreateTableOptions().addHashPartitions(List(idCol).asJava, 3))
+      // Make sure the table does not exist. This is mostly to demonstrate
+      // the capabilities of the API. In general, there might be a racing
+      // request to create the table coming from elsewhere, so even
+      // if tableExists() returned false at this time, the table might appear
+      // while createTable() is running below. In the latter case, appropriate
+      // Kudu exception will be thrown by createTable().
+      if (kuduContext.tableExists(tableName)) {
+        throw new RuntimeException(tableName + ": table already exists")
       }
 
+      // Create the table with 3 hash partitions, resulting in 3 tablets,
+      // each with the specified number of replicas.
+      kuduContext.createTable(tableName, schema, Seq(idCol),
+        new CreateTableOptions()
+          .addHashPartitions(List(idCol).asJava, 3)
+          .setNumReplicas(tableNumReplicas))
+      tableIsCreated = true
+
       // Write to the table.
-      logger.info(s"Writing to table '$tableName'")
+      logger.info(s"writing to table '$tableName'")
       val data = Array(User("userA", 1234), User("userB", 5678))
       val userRDD = spark.sparkContext.parallelize(data)
       val userDF = userRDD.toDF()
       kuduContext.insertRows(userDF, tableName)
 
       // Read from the table using an RDD.
-      logger.info("Reading back the rows just written")
+      logger.info(s"reading back the rows just written")
       val readCols = Seq(nameCol, idCol)
       val readRDD = kuduContext.kuduRDD(spark.sparkContext, tableName, readCols)
       val userTuple = readRDD.map { case Row(name: String, id: Int) => (name, id) }
       userTuple.collect().foreach(println(_))
 
       // Upsert some rows.
-      logger.info(s"Upserting to table '$tableName'")
+      logger.info(s"upserting to table '$tableName'")
       val upsertUsers = Array(User("newUserA", 1234), User("userC", 7777))
       val upsertUsersRDD = spark.sparkContext.parallelize(upsertUsers)
       val upsertUsersDF = upsertUsersRDD.toDF()
@@ -94,13 +125,19 @@ object SparkExample {
       val sqlDF = spark.sqlContext.read.options(
         Map("kudu.master" -> kuduMasters, "kudu.table" -> tableName)).kudu
       sqlDF.createOrReplaceTempView(tableName)
-      spark.sqlContext.sql(s"select * from $tableName where $idCol > 1000").show
-    }
-
-    finally {
+      spark.sqlContext.sql(s"SELECT * FROM $tableName WHERE $idCol > 1000").show
+    } catch {
+      // Catch, log and re-throw. Not the best practice, but this is a very
+      // simplistic example.
+      case unknown : Throwable => logger.error(s"got an exception: " + unknown)
+      throw unknown
+    } finally {
       // Clean up.
-      logger.info(s"Deleting table '$tableName' and closing down the session")
-      kuduContext.deleteTable(tableName)
+      if (tableIsCreated) {
+        logger.info(s"deleting table '$tableName'")
+        kuduContext.deleteTable(tableName)
+      }
+      logger.info(s"closing down the session")
       spark.close()
     }
   }