You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by ja...@apache.org on 2014/12/18 03:11:57 UTC

bigtop git commit: BIGTOP-1273. BigPetStore Cleanup formatting.

Repository: bigtop
Updated Branches:
  refs/heads/master fe15ba18b -> 388beca8e


BIGTOP-1273. BigPetStore Cleanup formatting.


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/388beca8
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/388beca8
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/388beca8

Branch: refs/heads/master
Commit: 388beca8e1b30133024ce3c38ce8785b09f3ba10
Parents: fe15ba1
Author: jayunit100 <ja...@gmail.com>
Authored: Mon Dec 15 22:24:02 2014 -0500
Committer: jayunit100 <ja...@apache.org>
Committed: Wed Dec 17 21:11:14 2014 -0500

----------------------------------------------------------------------
 .../spark/generator/SparkDriver.scala           | 208 ++++++++++---------
 .../apache/bigpetstore/spark/etl/ETLSuite.scala |  31 ++-
 2 files changed, 125 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/388beca8/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
index 2d7ed17..19d1565 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
@@ -17,10 +17,11 @@
 
 package org.apache.bigtop.bigpetstore.spark.generator
 
-import com.github.rnowling.bps.datagenerator.datamodels.{Store,Customer,PurchasingProfile,Transaction}
+import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord
+import com.github.rnowling.bps.datagenerator.datamodels._
 import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator}
 import com.github.rnowling.bps.datagenerator.framework.SeedFactory
-
+import scala.collection.JavaConversions._
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
@@ -30,27 +31,34 @@ import scala.util.Random
 import java.io.File
 import java.util.Date
 
+/**
+ * This driver uses the data generator API to generate
+ * an arbitrarily large data set of petstore transactions.
+ *
+ * Each "transaction" consists of many "products", each of which
+ * is stringified into what is often called a "line item".
+ *
+ * Then, spark writes those line items out as a distributed hadoop file glob.
+ *
+ */
 object SparkDriver {
   private var nStores: Int = -1
   private var nCustomers: Int = -1
   private var simulationLength: Double = -1.0
   private var seed: Long = -1
   private var outputDir: String = ""
-
   private val NPARAMS = 5
 
   private def printUsage() {
-    val usage: String = "BigPetStore Data Generator\n" +
-      "\n" +
+    val usage: String =
+      "BigPetStore Data Generator.\n" +
       "Usage: spark-submit ... outputDir nStores nCustomers simulationLength [seed]\n" +
-      "\n" +
       "outputDir - (string) directory to write files\n" +
       "nStores - (int) number of stores to generate\n" +
       "nCustomers - (int) number of customers to generate\n" +
       "simulationLength - (float) number of days to simulate\n" +
       "seed - (long) seed for RNG. If not given, one is reandomly generated.\n"
-
-    println(usage)
+    System.err.println(usage)
   }
 
   def parseArgs(args: Array[String]) {
@@ -58,52 +66,43 @@ object SparkDriver {
       printUsage()
       System.exit(1)
     }
-
-    var i = 0
-
-    outputDir = args(i)
-
-    i += 1
+    outputDir = args(0)
     try {
-      nStores = args(i).toInt
+      nStores = args(1).toInt
     }
     catch {
       case _ : NumberFormatException =>
-        System.err.println("Unable to parse '" + args(i) + "' as an integer for nStores.\n")
+        System.err.println("Unable to parse '" + args(1) + "' as an integer for nStores.\n")
         printUsage()
         System.exit(1)
     }
-
-    i += 1
     try {
-      nCustomers = args(i).toInt
+      nCustomers = args(2).toInt
     }
     catch {
       case _ : NumberFormatException =>
-        System.err.println("Unable to parse '" + args(i) + "' as an integer for nCustomers.\n")
+        System.err.println("Unable to parse '" + args(2) + "' as an integer for nCustomers.\n")
         printUsage()
         System.exit(1)
     }
-
-    i += 1
     try {
-      simulationLength = args(i).toDouble
+      simulationLength = args(3).toDouble
     }
     catch {
       case _ : NumberFormatException =>
-        System.err.println("Unable to parse '" + args(i) + "' as a float for simulationLength.\n")
+        System.err.println("Unable to parse '" + args(3) + "' as a float for simulationLength.\n")
         printUsage()
         System.exit(1)
     }
 
+    //If seed isnt present, then no is used seed.
     if(args.length == NPARAMS) {
-      i += 1
       try {
-        seed = args(i).toLong
+        seed = args(4).toLong
       }
       catch {
         case _ : NumberFormatException =>
-          System.err.println("Unable to parse '" + args(i) + "' as a long for seed.\n")
+          System.err.println("Unable to parse '" + args(4) + "' as a long for seed.\n")
           printUsage()
           System.exit(1)
       }
@@ -113,13 +112,18 @@ object SparkDriver {
     }
   }
 
+  /**
+   * Here we generate an RDD of all the petstore transactions,
+   * by generating the static data first (stores, customers, ...)
+   * followed by running the simulation as a distributed spark task.
+   */
   def generateData(sc: SparkContext): RDD[Transaction] = {
     val inputData = new DataLoader().loadData()
-    val seedFactory = new SeedFactory(seed);
+    val seedFactory = new SeedFactory(seed)
 
     println("Generating stores...")
     val stores : ArrayList[Store] = new ArrayList()
-    val storeGenerator = new StoreGenerator(inputData, seedFactory);
+    val storeGenerator = new StoreGenerator(inputData, seedFactory)
     for(i <- 1 to nStores) {
       val store = storeGenerator.generate()
       stores.add(store)
@@ -133,101 +137,111 @@ object SparkDriver {
       val customer = custGen.generate()
       customers = customer :: customers
     }
-    println("Done.")
+    println("...Done generating customers.")
 
-    println("Broadcasting stores and products")
+    println("Broadcasting stores and products...")
     val storesBC = sc.broadcast(stores)
     val productBC = sc.broadcast(inputData.getProductCategories())
     val customerRDD = sc.parallelize(customers)
     val nextSeed = seedFactory.getNextSeed()
-
-    println("Defining transaction DAG")
-    val transactionRDD = customerRDD.mapPartitionsWithIndex { (index, custIter) =>
-      val seedFactory = new SeedFactory(nextSeed ^ index)
-      val transactionIter = custIter.map{ customer =>
-	val products = productBC.value
-
-        val profileGen = new PurchasingProfileGenerator(products, seedFactory)
-        val profile = profileGen.generate()
-
-        val transGen = new TransactionGenerator(customer, profile, storesBC.value, products,
-          seedFactory)
-
-        var transactions : List[Transaction] = List()
-	var transaction = transGen.generate()
-        while(transaction.getDateTime() < simulationLength) {
-          transactions = transaction :: transactions
-
-          transaction = transGen.generate()
+    println("...Done broadcasting stores and products.")
+
+    println("Defining transaction DAG...")
+
+    /**
+     *  See inline comments below regarding how we
+     *  generate TRANSACTION objects from CUSTOMERs.
+     */
+    val transactionRDD = customerRDD.mapPartitionsWithIndex{
+      (index, custIter) =>
+        // Create a new RNG
+        val seedFactory = new SeedFactory(nextSeed ^ index)
+        val transactionIter = custIter.map{
+        customer =>
+	        val products = productBC.value
+          //Create a new purchasing profile.
+          val profileGen = new PurchasingProfileGenerator(products, seedFactory)
+          val profile = profileGen.generate()
+          val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory)
+          var transactions : List[Transaction] = List()
+	        var transaction = transGen.generate()
+
+          //Create a list of this customer's transactions for the time period
+          while(transaction.getDateTime() < simulationLength) {
+            transactions = transaction :: transactions
+            transaction = transGen.generate()
+          }
+          //The final result, we return the list of transactions produced above.
+	    transactions
         }
-
-	transactions
-      }
       transactionIter
-    }.flatMap( s => s)
+    }.flatMap(s => s)
+
+    println("...Done defining transaction DAG.")
 
     println("Generating transactions...")
-    val nTrans = transactionRDD.count()
-    println(s"Generated $nTrans transactions.")
 
+    // forces RDD materialization.
+    val nTrans = transactionRDD.count()
+    println(s"... Done Generating $nTrans transactions.")
+
+    /**
+     *  Return the RDD representing all the petstore transactions.
+     *  This RDD contains a distributed collection of instances where
+     *  a customer went to a pet store, and bought a variable number of items.
+     *  We can then serialize all the contents to disk.
+     */
     transactionRDD
   }
 
+  def lineItem(t: Transaction, date:Date, p:Product): String = {
+      t.getStore.getId + "," +
+      t.getStore.getLocation+ "," +
+      t.getStore.getLocation.getCity + "," +
+      t.getStore.getLocation.getState + "," +
+      t.getCustomer.getId + "," +
+      t.getCustomer.getName.getFirst + " " +t.getCustomer.getName.getSecond + "," +
+      t.getCustomer.getLocation.getZipcode + "," +
+      t.getCustomer.getLocation.getCity + "," +
+      t.getCustomer.getLocation.getState + "," +
+      t.getId + "," +
+      date + "," + p
+  }
   def writeData(transactionRDD : RDD[Transaction]) {
     val initialDate : Long = new Date().getTime()
 
-    val transactionStringsRDD = transactionRDD.map { t =>
-      var records : List[String] = List()
-      val products = t.getProducts()
-      for(i <- 0 until products.size()) {
-        val p = products.get(i)
-	val name = t.getCustomer().getName()
-        val custLocation = t.getCustomer().getLocation()
-        val storeLocation = t.getStore().getLocation()
-
-        // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec
-        val dateMS = (t.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong
-        val date = new Date(initialDate + dateMS)
-
-
-        var record = ""
-        record += t.getStore().getId() + ","
-        record += storeLocation.getZipcode() + ","
-        record += storeLocation.getCity() + ","
-        record += storeLocation.getState() + ","
-
-        record += t.getCustomer().getId() + ","
-	record += name.getFirst() + "," + name.getSecond() + ","
-	record += custLocation.getZipcode() + ","
-	record += custLocation.getCity() + ","
-	record += custLocation.getState() + ","
-
-        record += t.getId() + ","
-        record += date + ","
-	record += p
-
-        records = record :: records
-      }
+    val transactionStringsRDD = transactionRDD.map {
+      transaction =>
+        val products = transaction.getProducts()
+
+        /*********************************************************
+        * we define a "records" RDD : Which is a
+        * mapping of products from each single transaction to strings.
+        *
+        * So we ultimately define an RDD of strings, where each string represents
+        * an instance where of a item purchase.
+        * ********************************************************/
+        val records = products.map{
+          product =>
+            val storeLocation = transaction.getStore().getLocation()
+            // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec
+            val dateMS = (transaction.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong
+            // Return a stringified "line item", which represents a single item bought.
+            lineItem(transaction, new Date(initialDate + dateMS), product)
+        }
 
       records
-    }.flatMap { s => s }
-
+    }
+    // Distributed serialization of the records to part-r-* files...
     transactionStringsRDD.saveAsTextFile(outputDir + "/transactions")
   }
 
   def main(args: Array[String]) {
     parseArgs(args)
-
-    println("Creating SparkConf")
     val conf = new SparkConf().setAppName("BPS Data Generator")
-
-    println("Creating SparkContext")
     val sc = new SparkContext(conf)
-
     val transactionRDD = generateData(sc)
-
     writeData(transactionRDD)
-
     sc.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/388beca8/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
index 7ccece3..a7699ac 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
@@ -40,34 +40,31 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
   var transactions: Option[Array[Transaction]] = None
 
   val stores = Array(Store(5L, "11553"), Store(1L, "98110"), Store(6L, "66067"))
-  val locations = Array(Location("11553", "Uniondale", "NY"),
-    Location("98110", "Bainbridge Islan", "WA"),
-    Location("66067", "Ottawa", "KS"),
-    Location("20152", "Chantilly", "VA"))
+  val locations =
+    Array(
+      Location("11553", "Uniondale", "NY"),
+      Location("98110", "Bainbridge Islan", "WA"),
+      Location("66067", "Ottawa", "KS"),
+      Location("20152", "Chantilly", "VA"))
   val customers = Array(Customer(999L, "Cesareo", "Lamplough", "20152"))
-  val products = Array(
-    Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> "Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> "2.67")),
-    Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")),
-    Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> "Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", "per_unit_cost" -> "2.14")))
+  val products =
+    Array(
+      Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> "Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> "2.67")),
+      Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")),
+      Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> "Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", "per_unit_cost" -> "2.14")))
 
   val rawLines = Array(
     "5,11553,Uniondale,NY,999,Cesareo,Lamplough,20152,Chantilly,VA,32,Tue Nov 03 01:08:11 EST 2015,category=dry dog food;brand=Happy Pup;flavor=Fish & Potato;size=30.0;per_unit_cost=2.67;",
-
     "1,98110,Bainbridge Islan,WA,999,Cesareo,Lamplough,20152,Chantilly,VA,31,Mon Nov 02 17:51:37 EST 2015,category=poop bags;brand=Dog Days;color=Blue;size=60.0;per_unit_cost=0.21;",
-
     "6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;")
 
-
   override def beforeAll() {
     val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]")
     sc = Some(new SparkContext(conf))
 
-    val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
-      Locale.US)
-    val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
-      Locale.US)
-    val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
-      Locale.US)
+    val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
+    val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
+    val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
 
     // Calendar seems to interpet months as 0-11
     // ms are not in output we parse.