You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by ja...@apache.org on 2014/12/18 03:11:57 UTC
bigtop git commit: BIGTOP-1273. BigPetStore Cleanup formatting.
Repository: bigtop
Updated Branches:
refs/heads/master fe15ba18b -> 388beca8e
BIGTOP-1273. BigPetStore Cleanup formatting.
Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/388beca8
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/388beca8
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/388beca8
Branch: refs/heads/master
Commit: 388beca8e1b30133024ce3c38ce8785b09f3ba10
Parents: fe15ba1
Author: jayunit100 <ja...@gmail.com>
Authored: Mon Dec 15 22:24:02 2014 -0500
Committer: jayunit100 <ja...@apache.org>
Committed: Wed Dec 17 21:11:14 2014 -0500
----------------------------------------------------------------------
.../spark/generator/SparkDriver.scala | 208 ++++++++++---------
.../apache/bigpetstore/spark/etl/ETLSuite.scala | 31 ++-
2 files changed, 125 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bigtop/blob/388beca8/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
index 2d7ed17..19d1565 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
@@ -17,10 +17,11 @@
package org.apache.bigtop.bigpetstore.spark.generator
-import com.github.rnowling.bps.datagenerator.datamodels.{Store,Customer,PurchasingProfile,Transaction}
+import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord
+import com.github.rnowling.bps.datagenerator.datamodels._
import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator}
import com.github.rnowling.bps.datagenerator.framework.SeedFactory
-
+import scala.collection.JavaConversions._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
@@ -30,27 +31,34 @@ import scala.util.Random
import java.io.File
import java.util.Date
+/**
+ * This driver uses the data generator API to generate
+ * an arbitrarily large data set of petstore transactions.
+ *
+ * Each "transaction" consists of many "products", each of which
+ * is stringified into what is often called a "line item".
+ *
+ * Then, spark writes those line items out as a distributed hadoop file glob.
+ *
+ */
object SparkDriver {
private var nStores: Int = -1
private var nCustomers: Int = -1
private var simulationLength: Double = -1.0
private var seed: Long = -1
private var outputDir: String = ""
-
private val NPARAMS = 5
private def printUsage() {
- val usage: String = "BigPetStore Data Generator\n" +
- "\n" +
+ val usage: String =
+ "BigPetStore Data Generator.\n" +
"Usage: spark-submit ... outputDir nStores nCustomers simulationLength [seed]\n" +
- "\n" +
"outputDir - (string) directory to write files\n" +
"nStores - (int) number of stores to generate\n" +
"nCustomers - (int) number of customers to generate\n" +
"simulationLength - (float) number of days to simulate\n" +
"seed - (long) seed for RNG. If not given, one is reandomly generated.\n"
-
- println(usage)
+ System.err.println(usage)
}
def parseArgs(args: Array[String]) {
@@ -58,52 +66,43 @@ object SparkDriver {
printUsage()
System.exit(1)
}
-
- var i = 0
-
- outputDir = args(i)
-
- i += 1
+ outputDir = args(0)
try {
- nStores = args(i).toInt
+ nStores = args(1).toInt
}
catch {
case _ : NumberFormatException =>
- System.err.println("Unable to parse '" + args(i) + "' as an integer for nStores.\n")
+ System.err.println("Unable to parse '" + args(1) + "' as an integer for nStores.\n")
printUsage()
System.exit(1)
}
-
- i += 1
try {
- nCustomers = args(i).toInt
+ nCustomers = args(2).toInt
}
catch {
case _ : NumberFormatException =>
- System.err.println("Unable to parse '" + args(i) + "' as an integer for nCustomers.\n")
+ System.err.println("Unable to parse '" + args(2) + "' as an integer for nCustomers.\n")
printUsage()
System.exit(1)
}
-
- i += 1
try {
- simulationLength = args(i).toDouble
+ simulationLength = args(3).toDouble
}
catch {
case _ : NumberFormatException =>
- System.err.println("Unable to parse '" + args(i) + "' as a float for simulationLength.\n")
+ System.err.println("Unable to parse '" + args(3) + "' as a float for simulationLength.\n")
printUsage()
System.exit(1)
}
+ //If seed isnt present, then no is used seed.
if(args.length == NPARAMS) {
- i += 1
try {
- seed = args(i).toLong
+ seed = args(4).toLong
}
catch {
case _ : NumberFormatException =>
- System.err.println("Unable to parse '" + args(i) + "' as a long for seed.\n")
+ System.err.println("Unable to parse '" + args(4) + "' as a long for seed.\n")
printUsage()
System.exit(1)
}
@@ -113,13 +112,18 @@ object SparkDriver {
}
}
+ /**
+ * Here we generate an RDD of all the petstore transactions,
+ * by generating the static data first (stores, customers, ...)
+ * followed by running the simulation as a distributed spark task.
+ */
def generateData(sc: SparkContext): RDD[Transaction] = {
val inputData = new DataLoader().loadData()
- val seedFactory = new SeedFactory(seed);
+ val seedFactory = new SeedFactory(seed)
println("Generating stores...")
val stores : ArrayList[Store] = new ArrayList()
- val storeGenerator = new StoreGenerator(inputData, seedFactory);
+ val storeGenerator = new StoreGenerator(inputData, seedFactory)
for(i <- 1 to nStores) {
val store = storeGenerator.generate()
stores.add(store)
@@ -133,101 +137,111 @@ object SparkDriver {
val customer = custGen.generate()
customers = customer :: customers
}
- println("Done.")
+ println("...Done generating customers.")
- println("Broadcasting stores and products")
+ println("Broadcasting stores and products...")
val storesBC = sc.broadcast(stores)
val productBC = sc.broadcast(inputData.getProductCategories())
val customerRDD = sc.parallelize(customers)
val nextSeed = seedFactory.getNextSeed()
-
- println("Defining transaction DAG")
- val transactionRDD = customerRDD.mapPartitionsWithIndex { (index, custIter) =>
- val seedFactory = new SeedFactory(nextSeed ^ index)
- val transactionIter = custIter.map{ customer =>
- val products = productBC.value
-
- val profileGen = new PurchasingProfileGenerator(products, seedFactory)
- val profile = profileGen.generate()
-
- val transGen = new TransactionGenerator(customer, profile, storesBC.value, products,
- seedFactory)
-
- var transactions : List[Transaction] = List()
- var transaction = transGen.generate()
- while(transaction.getDateTime() < simulationLength) {
- transactions = transaction :: transactions
-
- transaction = transGen.generate()
+ println("...Done broadcasting stores and products.")
+
+ println("Defining transaction DAG...")
+
+ /**
+ * See inline comments below regarding how we
+ * generate TRANSACTION objects from CUSTOMERs.
+ */
+ val transactionRDD = customerRDD.mapPartitionsWithIndex{
+ (index, custIter) =>
+ // Create a new RNG
+ val seedFactory = new SeedFactory(nextSeed ^ index)
+ val transactionIter = custIter.map{
+ customer =>
+ val products = productBC.value
+ //Create a new purchasing profile.
+ val profileGen = new PurchasingProfileGenerator(products, seedFactory)
+ val profile = profileGen.generate()
+ val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory)
+ var transactions : List[Transaction] = List()
+ var transaction = transGen.generate()
+
+ //Create a list of this customer's transactions for the time period
+ while(transaction.getDateTime() < simulationLength) {
+ transactions = transaction :: transactions
+ transaction = transGen.generate()
+ }
+ //The final result, we return the list of transactions produced above.
+ transactions
}
-
- transactions
- }
transactionIter
- }.flatMap( s => s)
+ }.flatMap(s => s)
+
+ println("...Done defining transaction DAG.")
println("Generating transactions...")
- val nTrans = transactionRDD.count()
- println(s"Generated $nTrans transactions.")
+ // forces RDD materialization.
+ val nTrans = transactionRDD.count()
+ println(s"... Done Generating $nTrans transactions.")
+
+ /**
+ * Return the RDD representing all the petstore transactions.
+ * This RDD contains a distributed collection of instances where
+ * a customer went to a pet store, and bought a variable number of items.
+ * We can then serialize all the contents to disk.
+ */
transactionRDD
}
+ def lineItem(t: Transaction, date:Date, p:Product): String = {
+ t.getStore.getId + "," +
+ t.getStore.getLocation+ "," +
+ t.getStore.getLocation.getCity + "," +
+ t.getStore.getLocation.getState + "," +
+ t.getCustomer.getId + "," +
+ t.getCustomer.getName.getFirst + " " +t.getCustomer.getName.getSecond + "," +
+ t.getCustomer.getLocation.getZipcode + "," +
+ t.getCustomer.getLocation.getCity + "," +
+ t.getCustomer.getLocation.getState + "," +
+ t.getId + "," +
+ date + "," + p
+ }
def writeData(transactionRDD : RDD[Transaction]) {
val initialDate : Long = new Date().getTime()
- val transactionStringsRDD = transactionRDD.map { t =>
- var records : List[String] = List()
- val products = t.getProducts()
- for(i <- 0 until products.size()) {
- val p = products.get(i)
- val name = t.getCustomer().getName()
- val custLocation = t.getCustomer().getLocation()
- val storeLocation = t.getStore().getLocation()
-
- // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec
- val dateMS = (t.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong
- val date = new Date(initialDate + dateMS)
-
-
- var record = ""
- record += t.getStore().getId() + ","
- record += storeLocation.getZipcode() + ","
- record += storeLocation.getCity() + ","
- record += storeLocation.getState() + ","
-
- record += t.getCustomer().getId() + ","
- record += name.getFirst() + "," + name.getSecond() + ","
- record += custLocation.getZipcode() + ","
- record += custLocation.getCity() + ","
- record += custLocation.getState() + ","
-
- record += t.getId() + ","
- record += date + ","
- record += p
-
- records = record :: records
- }
+ val transactionStringsRDD = transactionRDD.map {
+ transaction =>
+ val products = transaction.getProducts()
+
+ /*********************************************************
+ * we define a "records" RDD : Which is a
+ * mapping of products from each single transaction to strings.
+ *
+ * So we ultimately define an RDD of strings, where each string represents
+ * an instance where of a item purchase.
+ * ********************************************************/
+ val records = products.map{
+ product =>
+ val storeLocation = transaction.getStore().getLocation()
+ // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec
+ val dateMS = (transaction.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong
+ // Return a stringified "line item", which represents a single item bought.
+ lineItem(transaction, new Date(initialDate + dateMS), product)
+ }
records
- }.flatMap { s => s }
-
+ }
+ // Distributed serialization of the records to part-r-* files...
transactionStringsRDD.saveAsTextFile(outputDir + "/transactions")
}
def main(args: Array[String]) {
parseArgs(args)
-
- println("Creating SparkConf")
val conf = new SparkConf().setAppName("BPS Data Generator")
-
- println("Creating SparkContext")
val sc = new SparkContext(conf)
-
val transactionRDD = generateData(sc)
-
writeData(transactionRDD)
-
sc.stop()
}
}
http://git-wip-us.apache.org/repos/asf/bigtop/blob/388beca8/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
index 7ccece3..a7699ac 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
+++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
@@ -40,34 +40,31 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
var transactions: Option[Array[Transaction]] = None
val stores = Array(Store(5L, "11553"), Store(1L, "98110"), Store(6L, "66067"))
- val locations = Array(Location("11553", "Uniondale", "NY"),
- Location("98110", "Bainbridge Islan", "WA"),
- Location("66067", "Ottawa", "KS"),
- Location("20152", "Chantilly", "VA"))
+ val locations =
+ Array(
+ Location("11553", "Uniondale", "NY"),
+ Location("98110", "Bainbridge Islan", "WA"),
+ Location("66067", "Ottawa", "KS"),
+ Location("20152", "Chantilly", "VA"))
val customers = Array(Customer(999L, "Cesareo", "Lamplough", "20152"))
- val products = Array(
- Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> "Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> "2.67")),
- Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")),
- Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> "Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", "per_unit_cost" -> "2.14")))
+ val products =
+ Array(
+ Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> "Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> "2.67")),
+ Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")),
+ Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> "Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", "per_unit_cost" -> "2.14")))
val rawLines = Array(
"5,11553,Uniondale,NY,999,Cesareo,Lamplough,20152,Chantilly,VA,32,Tue Nov 03 01:08:11 EST 2015,category=dry dog food;brand=Happy Pup;flavor=Fish & Potato;size=30.0;per_unit_cost=2.67;",
-
"1,98110,Bainbridge Islan,WA,999,Cesareo,Lamplough,20152,Chantilly,VA,31,Mon Nov 02 17:51:37 EST 2015,category=poop bags;brand=Dog Days;color=Blue;size=60.0;per_unit_cost=0.21;",
-
"6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;")
-
override def beforeAll() {
val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]")
sc = Some(new SparkContext(conf))
- val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
- Locale.US)
- val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
- Locale.US)
- val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
- Locale.US)
+ val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
+ val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
+ val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
// Calendar seems to interpet months as 0-11
// ms are not in output we parse.