You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by ja...@apache.org on 2014/08/20 05:13:43 UTC
[2/2] git commit: BIGTOP-1272: Productionize the mahout recommender
BIGTOP-1272: Productionize the mahout recommender
Signed-off-by: jay@apache.org <jayunit100>
Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/4fca4573
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/4fca4573
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/4fca4573
Branch: refs/heads/master
Commit: 4fca4573b388714e305cd365f4a9bed4f4c17e8c
Parents: e9771e6
Author: bhashit parikh <bh...@gmail.com>
Authored: Thu May 29 11:07:15 2014 +0530
Committer: jay@apache.org <jayunit100>
Committed: Tue Aug 19 23:09:35 2014 -0400
----------------------------------------------------------------------
bigtop-bigpetstore/BPS_analytics.pig | 10 +-
bigtop-bigpetstore/README.md | 68 ++-
bigtop-bigpetstore/arch.dot | 27 +-
bigtop-bigpetstore/build.gradle | 327 +++++++------
.../bigtop/bigpetstore/BigPetStoreMahoutIT.java | 73 +++
.../bigtop/bigpetstore/BigPetStorePigIT.java | 68 +--
.../org/apache/bigtop/bigpetstore/ITUtils.java | 92 ++--
.../bigtop/bigpetstore/etl/PigCSVCleaner.java | 71 ++-
.../bigpetstore/generator/BPSGenerator.java | 110 ++---
.../generator/CustomerGenerator.scala | 80 ++++
...GeneratePetStoreTransactionsInputFormat.java | 134 ------
.../PetStoreTransactionInputSplit.java | 28 +-
.../PetStoreTransactionsInputFormat.java | 139 ++++++
.../generator/TransactionIteratorFactory.java | 468 -------------------
.../bigpetstore/generator/util/Product.java | 63 +++
.../bigpetstore/generator/util/ProductType.java | 29 ++
.../bigpetstore/generator/util/State.java | 26 ++
.../bigpetstore/recommend/ItemRecommender.scala | 103 ++++
.../bigpetstore/util/BigPetStoreConstants.java | 17 +-
.../bigpetstore/util/NumericalIdUtils.java | 10 +-
.../apache/bigtop/bigpetstore/util/Pair.java | 125 -----
.../bigpetstore/generator/DataForger.scala | 263 +++++++++++
.../generator/TransactionIteratorFactory.scala | 104 +++++
.../generator/TestNumericalIdUtils.java | 8 +-
.../TestPetStoreTransactionGeneratorJob.java | 10 +-
25 files changed, 1347 insertions(+), 1106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/BPS_analytics.pig
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/BPS_analytics.pig b/bigtop-bigpetstore/BPS_analytics.pig
index 44ed541..23e3749 100755
--- a/bigtop-bigpetstore/BPS_analytics.pig
+++ b/bigtop-bigpetstore/BPS_analytics.pig
@@ -38,14 +38,16 @@ csvdata =
dump:chararray,
state:chararray,
transaction:int,
+ custId:long,
fname:chararray,
lname:chararray,
- date:chararray,
+ productId:int,
+ product:chararray,
price:float,
- product:chararray);
+ date:chararray);
-- RESULT:
--- (BigPetStore,storeCode_AK,1,jay,guy,Thu Dec 18 12:17:10 EST 1969,10.5,dog-food)
+-- (BigPetStore,storeCode_AK,1,11,jay,guy,3,dog-food,10.5,Thu Dec 18 12:17:10 EST 1969)
-- ...
-- Okay! Now lets group our data so we can do some stats.
@@ -55,7 +57,7 @@ csvdata =
state_product = group csvdata by ( state, product ) ;
-- RESULT
--- ((storeCode_AK,dog-food) , {(BigPetStore,storeCode_AK,1,jay,guy,Thu Dec 18 12:17:10 EST 1969,10.5,dog-food)}) --
+-- ((storeCode_AK,dog-food) , {(BigPetStore,storeCode_AK,1,11,jay,guy,3,dog-food,10.5,Thu Dec 18 12:17:10 EST 1969)}) --
-- ...
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/README.md
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/README.md b/bigtop-bigpetstore/README.md
index 40a9088..e58d8f3 100644
--- a/bigtop-bigpetstore/README.md
+++ b/bigtop-bigpetstore/README.md
@@ -13,7 +13,7 @@ Architecture
The application consists of the following modules
* generator: generates raw data on the dfs
-* clustering: Apache Mahout demo code for processing the data using Item based Collaborative Filtering. This feature is not supported yet. You can track its progress using this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1272)
+* recommendations: Apache Mahout demo code for generating recommendations by anaylyzing the transaction records. This feature can be tracked at this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1272)
* Pig: demo code for processing the data using Apache Pig
* Hive: demo code for processing the data using Apache Hive. This part is not complete yet. We are working on it. You can track it using this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1270)
* Crunch: demo code for processing the data using Apache Crunch
@@ -21,22 +21,22 @@ The application consists of the following modules
Build Instructions
------------------
-You'll need to have [`gradle`](http://www.gradle.org/downloads) installed and set-up correctly in order to follow along these instructions.
+You'll need to have version 2.0 of [`gradle`](http://www.gradle.org/downloads) installed and set-up correctly in order to follow along these instructions.
We could have used the [`gradle-wrapper`](http://www.gradle.org/docs/current/userguide/gradle_wrapper.html) to avoid having to install `gradle`, but the `bigtop` project includes all `gradle*` directories in `.gitignore`. So, that's not going to work.
### Build the JAR
- `gradle clean build` will build the bigpetstore `jar`. The `jar` will be located in the `build\libs` directory.
+`gradle clean build` will build the bigpetstore `jar`. The `jar` will be located in the `build\libs` directory.
### Run Intergration Tests With
* Pig profile: `gradle clean integrationTest -P ITProfile=pig`
- * Crunch profile: `gradle clean integrationTest -P ITProfile=crunch`
+ * Mahout Profile: `gradle clean integrationTest -P ITProfile=mahout`
+ * Crunch profile: Not Implemented Yet
* Hive profile: Not implemented yet.
- * Mahout profile: Not implemented yet.
If you don't specify any profile-name, or if you specify an invalid-name for the `integrationTest` task, no integration tests will be run.
-*Note:* At this stage, only the `Pig` profile is working. Will continue to update this area as further work is completed.
+*Note:* At this stage, only the `Pig` and `Mahout` profiles are working. Will continue to update this area as further work is completed.
For Eclipse Users
-----------------
@@ -87,14 +87,61 @@ The next phase of the application processes the data to create basic aggregation
- try it [on the gh-pages branch](http://jayunit100.github.io/bigpetstore/)
+
Running on a hadoop cluster
---------------------------
-wget s3://bigpetstore/bigpetstore.jar
+*Note:* For running the code using the `hadoop jar` command instead of the `gradle` tasks, you will need to set the classpath appropriately. The discussion after [this comment][jira-mahout] in JIRA could also be useful apart from these instructions.
+
+### Build the fat-jar
+
+We are going to use a fat-jar in order to avoid specifying the entire classpath ourselves.
+
+The fat-jar is required when we are running the application on a hadoop cluster. The other way would be to specify all the dependencies (including the transitive ones) manually while running the hadoop job. Fat-jars make it easier to bundle almost all the dependencies inside the distribution jar itself.
+
+```
+gradle clean shadowJar -Pfor-cluster
+```
+
+This command will build the fat-jar with all the dependencies bundled in except the hadoop, mahout and pig dependencies, which we'll specify using `-libjars` option while running the hadoop job. These dependencies are excluded to avoid conflicts with the jars provided by hadoop itself.
+
+The generated jar will be inside the `build/libs` dir, with name like `BigPetStore-x.x.x-SNAPSHOT-all.jar`. For the remainig discussion I'll refer to this jar by `bps.jar`.
+
+### Get the mahout and pig jars
+
+You'll need both mahout and pig jars with the hadoop classes excluded. Commonly, you can find both of these in their respective distros. The required pig jar is generally named like `pig-x.x.x-withouthadoop.jar` and the mahout jar would be named like `mahout-core-job.jar`. If you want, you can build those yourself by following the instructions in [this JIRA comment][jira-mahout]]. For the remaining discussion, I am going to refer to these two jars by `pig-withouthadoop.jar` and `mahout-core-job.jar`.
+
+### Setup the classpath for hadoop nodes in the cluster
+
+```
+export JARS="/usr/lib/pig/pig-withouthadoop.jar,/usr/lib/mahout/mahout-core-job.jar"
+```
+
+We also need these jars to be present on the client side to kick-off the jobs. Reusing the `JARS` variable to put the same jars on the client classpath.
+
+```
+export HADOOP_CLASSPATH=`echo $JARS | sed s/,/:/g`
+```
+
+### Generate the data
+
+```
+hadoop jar bps.jar org.apache.bigtop.bigpetstore.generator.BPSGenerator 1000000 bigpetstore/gen
+```
+
+### Clean with pig
+
+```
+hadoop jar bps.jar org.apache.bigtop.bigpetstore.etl.PigCSVCleaner -libjars $JARS bigpetstore/gen/ bigpetstore/ custom_pigscript.pig
+```
+
+### Analyze and generate recommendations with mahout
+
+```
+hadoop jar bps.jar org.apache.bigtop.bigpetstore.recommend.ItemRecommender -libjars $JARS bigpetstore/pig/Mahout bigpetstore/Mahout/AlsFactorization bigpetstore/Mahout/AlsRecommendations
+```
-hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.generator.BPSGenerator 1000000 bigpetstore/gen
-hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.etl.PigCSVCleaner bigpetstore/gen/ bigpetstore/pig/ custom_pigscript.pig
... (will add more steps as we add more phases to the workflow) ...
@@ -134,3 +181,6 @@ of EMR setup w/ a custom script).
...
And so on.
+
+
+[jira-mahout]: https://issues.apache.org/jira/browse/BIGTOP-1272?focusedCommentId=14076023&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-1407602
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/arch.dot
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/arch.dot b/bigtop-bigpetstore/arch.dot
index 0f3f404..7d17c5a 100644
--- a/bigtop-bigpetstore/arch.dot
+++ b/bigtop-bigpetstore/arch.dot
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,25 +18,24 @@ digraph bigpetstore {
node [shape=record];
- PROD_And_USER_HASH_FUNC [label="python or datafu udf" ,style="rounded,filled", shape=diamond];
+ BPSAnalytics [label="BPSAnalytics.pig" ,style="rounded, filled", shape=diamond];
CUSTOMER_PAGE [label="CUSTOMER_PAGE|json|CUSTOMER_PAGE/part*"];
DIRTY_CSV [label="DIRTY_CSV|fname lname -prod , price ,prod,..|generated/part*"];
CSV [label="CSV|fname,lname,prod,price,date,xcoord,ycoord,...|cleaned/part*"];
- MAHOUT_VIEW_INPUT [label="MAHOUT_VIEW | (hashed name) 10001, (hashed purchases) 203 | <hive_warehouse>/mahout_cf_in/part*" ];
- MAHOUT_CF [label="MAHOUT collaborative filter output | (hashed name) 10001, (hashed product) 201, .6 | mahout_cf_out/part*" ];
+ MAHOUT_VIEW_INPUT [label="MAHOUT_VIEW | (user-id) 10001 (product-id) 203 (implicit-rating) 1 | cleaned/Mahout/part*" ];
+ MAHOUT_ALS [label="Parallel ALS Recommender output | (user-id) 10001 [(product-id) 201: (recommendation-strength 0-1)0.546] | Mahout/AlsRecommendations/part*" ];
Generate -> DIRTY_CSV [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.generator.BPSGenerator 100 bps/generated/"] ;
- DIRTY_CSV -> pig [label=""];
+ DIRTY_CSV -> pig [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.etl.PigCSVCleaner bps/generated/ bps/cleaned/ "];
- pig -> CSV [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.etl.PigCSVCleaner bps/generated/ bps/cleaned/ "];
- CSV -> MAHOUT_VIEW_INPUT [label="BPS_Mahout_Viewbuilder.pig"];
- PROD_And_USER_HASH_FUNC -> MAHOUT_VIEW_INPUT [label="used in BPS_MAHOUT_Viewbuilder.pig script"] ;
+ pig -> CSV [label="pig query to clean up generated transaction records"];
+ pig -> MAHOUT_VIEW_INPUT [label="pig query to produce mahout input format"];
- MAHOUT_VIEW_INPUT -> mahout;
- mahout -> MAHOUT_CF [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.analytics.BPSRecommender bps/mahout_cf_in/part* bps/mahout_cf_out/"];
+ MAHOUT_VIEW_INPUT -> ParallelALSFactorizationJob [label="hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.recommend.ItemRecommender cleaned/Mahout Mahout/AlsFactorization Mahout/AlsRecommendations"];
+ ParallelALSFactorizationJob -> "Mahout RecommenderJob"
+ "Mahout RecommenderJob" -> MAHOUT_ALS
- CSV -> pig_job2;
- MAHOUT_CF -> pig_job2 ;
- PROD_And_USER_HASH_FUNC -> pig_job2;
- pig_job2 -> CUSTOMER_PAGE [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.analytics.BPSRecommender bpg/cleaned/ bps/mahout_cf_out/"];
+ CSV -> BPSAnalytics;
+ BPSAnalytics -> pig_job2;
+ pig_job2 -> CUSTOMER_PAGE [label=""];
}
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/build.gradle
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/build.gradle b/bigtop-bigpetstore/build.gradle
index efb69b3..a6a8c1a 100644
--- a/bigtop-bigpetstore/build.gradle
+++ b/bigtop-bigpetstore/build.gradle
@@ -3,6 +3,14 @@ apply plugin: "eclipse"
// TODO add idea module config.
apply plugin: "idea"
apply plugin: "scala"
+apply plugin: 'com.github.johnrengelman.shadow'
+
+buildscript {
+ repositories { jcenter() }
+ dependencies {
+ classpath 'com.github.jengelman.gradle.plugins:shadow:1.0.2'
+ }
+}
// Read the groupId and version properties from the "parent" bigtop project.
// It would be better if there was some better way of doing this. Howvever,
@@ -10,9 +18,9 @@ apply plugin: "scala"
// projects can't have maven projects as parents (AFAIK. If there is a way to do it,
// it doesn't seem to be well-documented).
def setProjectProperties() {
- Node xml = new XmlParser().parse("../pom.xml")
- group = xml.groupId.first().value().first()
- version = xml.version.first().value().first()
+ Node xml = new XmlParser().parse("../pom.xml")
+ group = xml.groupId.first().value().first()
+ version = xml.version.first().value().first()
}
setProjectProperties()
@@ -27,40 +35,49 @@ targetCompatibility = 1.7
// Specify any additional project properties.
ext {
- slf4jVersion = "1.7.5"
- guavaVersion = "15.0"
- hadoopVersion = "2.2.0"
- datanucleusVersion = "3.2.2"
- datanucleusJpaVersion = "3.2.1"
- bonecpVersion = "0.8.0.RELEASE"
- derbyVersion = "10.10.1.1"
+ slf4jVersion = "1.7.5"
+ guavaVersion = "15.0"
+ datanucleusVersion = "3.2.2"
+ datanucleusJpaVersion = "3.2.1"
+ bonecpVersion = "0.8.0.RELEASE"
+ derbyVersion = "10.10.1.1"
+
+ // from horton-works repo. They compile mahout-core against hadoop2.x. These
+ // mahout is compiled against 2.4.0
+ hadoopVersion = "2.4.0.2.1.2.0-402"
+ mahoutVersion = "0.9.0.2.1.2.0-402"
}
repositories {
- mavenCentral()
+ mavenCentral()
+ maven {
+ url "http://repo.hortonworks.com/content/repositories/releases/"
+ }
}
-tasks.withType(Compile) {
- options.encoding = 'UTF-8'
- options.compilerArgs << "-Xlint:all"
+tasks.withType(AbstractCompile) {
+ options.encoding = 'UTF-8'
+ options.compilerArgs << "-Xlint:all"
}
tasks.withType(ScalaCompile) {
- // Enables incremental compilation.
- // http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78
- scalaCompileOptions.useAnt = false
+ // Enables incremental compilation.
+ // http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78
+ scalaCompileOptions.useAnt = false
}
tasks.withType(Test) {
- testLogging {
- // Uncomment this if you want to see the console output from the tests.
- // showStandardStreams = true
- events "passed", "skipped", "failed"
- }
+ testLogging {
+ // Uncomment this if you want to see the console output from the tests.
+ // showStandardStreams = true
+ events "passed", "skipped", "failed"
+ // show standard out and standard error of the test JVM(s) on the console
+ //showStandardStreams = true
+ }
}
test {
- exclude "**/*TestPig.java", "**/*TestHiveEmbedded.java", "**/*TestCrunch.java", "**/*TestPetStoreTransactionGeneratorJob.java"
+ exclude "**/*TestPig.java", "**/*TestHiveEmbedded.java", "**/*TestCrunch.java", "**/*TestPetStoreTransactionGeneratorJob.java"
}
// Create a separate source-set for the src/integrationTest set of classes. The convention here
@@ -68,19 +85,23 @@ test {
// under the 'src' directory. So, in this case, it will look for a directory named 'src/integrationTest'
// since the name of the source-set is 'integrationTest'
sourceSets {
- // The main and test source-sets are configured by both java and scala plugins. They contain
- // all the src/main and src/test classes. The following statements make all of those classes
- // available on the classpath for the integration-tests, for both java and scala.
- integrationTest {
- java {
- compileClasspath += main.output + test.output
- runtimeClasspath += main.output + test.output
- }
- scala {
- compileClasspath += main.output + test.output
- runtimeClasspath += main.output + test.output
- }
- }
+ main {
+ java.srcDirs = [];
+ scala.srcDirs = ["src/main/scala", "src/main/java"]
+ }
+ // The main and test source-sets are configured by both java and scala plugins. They contain
+ // all the src/main and src/test classes. The following statements make all of those classes
+ // available on the classpath for the integration-tests, for both java and scala.
+ integrationTest {
+ java {
+ compileClasspath += main.output + test.output
+ runtimeClasspath += main.output + test.output
+ }
+ scala {
+ compileClasspath += main.output + test.output
+ runtimeClasspath += main.output + test.output
+ }
+ }
}
// Creating a source-set automatically add a couple of corresponding configurations (when java/scala
@@ -91,120 +112,164 @@ sourceSets {
// available for integrationTestRuntime. For ex. the testCompile configuration has a dependency on
// jUnit and scalatest. This makes them available for the integration tests as well.
configurations {
- integrationTestCompile {
- extendsFrom testCompile
- }
+ integrationTestCompile {
+ extendsFrom testCompile
+ }
- integrationTestRuntime {
- extendsFrom integrationTestCompile, testRuntime
- }
+ integrationTestRuntime {
+ extendsFrom integrationTestCompile, testRuntime
+ }
}
// To see the API that is being used here, consult the following docs
// http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html
def updateDependencyVersion(dependencyDetails, dependencyString) {
- def parts = dependencyString.split(':')
- def group = parts[0]
- def name = parts[1]
- def version = parts[2]
- if (dependencyDetails.requested.group == group
- && dependencyDetails.requested.name == name) {
- dependencyDetails.useVersion version
- }
+ def parts = dependencyString.split(':')
+ def group = parts[0]
+ def name = parts[1]
+ def version = parts[2]
+ if (dependencyDetails.requested.group == group
+ && dependencyDetails.requested.name == name) {
+ dependencyDetails.useVersion version
+ }
}
def setupPigIntegrationTestDependencyVersions(dependencyResolveDetails) {
- // This is the way we override the dependencies.
- updateDependencyVersion dependencyResolveDetails, "joda-time:joda-time:2.2"
+ // This is the way we override the dependencies.
+ updateDependencyVersion dependencyResolveDetails, "joda-time:joda-time:2.2"
}
def setupCrunchIntegrationTestDependencyVersions(dependencyResolveDetails) {
- // Specify any dependencies that you want to override for crunch integration tests.
+ // Specify any dependencies that you want to override for crunch integration tests.
+}
+
+def setupMahoutIntegrationTestDependencyVersions(dependencyResolveDetails) {
+ // Specify any dependencies that you want to override for mahout integration tests.
}
+
task integrationTest(type: Test, dependsOn: test) {
- testClassesDir = sourceSets.integrationTest.output.classesDir
- classpath = sourceSets.integrationTest.runtimeClasspath
-
- if(!project.hasProperty('ITProfile')) {
- // skip integration-tests if no profile has been specified.
- integrationTest.onlyIf { false }
- return;
- }
-
- def patternsToInclude
- def dependencyConfigClosure
- def skipDependencyUpdates = false
- // Select the pattern for test classes that should be executed, and the dependency
- // configuration function to be called based on the profile name specified at the command line.
- switch (project.ITProfile) {
- case "pig":
- patternsToInclude = "*PigIT*"
- dependencyConfigClosure = { setupPigIntegrationTestDependencyVersions(it) }
- break
- case "crunch":
- patternsToInclude = "*CrunchIT*"
- dependencyConfigClosure = { setupCrunchIntegrationTestDependencyVersions(it) }
- break
- // skip integration-tests if the passed in profile-name is not valid
- default: integrationTest.onlyIf { false }; return
- }
-
-
- filter { includeTestsMatching patternsToInclude }
-
- // This is the standard way gradle allows overriding each specific dependency.
- // see: http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html
- project.configurations.all {
- resolutionStrategy {
- eachDependency {
- dependencyConfigClosure(it)
- }
- }
- }
+ testClassesDir = sourceSets.integrationTest.output.classesDir
+ classpath = sourceSets.integrationTest.runtimeClasspath
+
+ if(!project.hasProperty('ITProfile')) {
+ // skip integration-tests if no profile has been specified.
+ integrationTest.onlyIf { false }
+ return;
+ }
+
+ def patternsToInclude
+ def dependencyConfigClosure
+ def skipDependencyUpdates = false
+ // Select the pattern for test classes that should be executed, and the dependency
+ // configuration function to be called based on the profile name specified at the command line.
+ switch (project.ITProfile) {
+ case "pig":
+ patternsToInclude = "*PigIT*"
+ dependencyConfigClosure = { setupPigIntegrationTestDependencyVersions(it) }
+ break
+ case "crunch":
+ patternsToInclude = "*CrunchIT*"
+ dependencyConfigClosure = { setupCrunchIntegrationTestDependencyVersions(it) }
+ break
+ case "mahout":
+ patternsToInclude = "*MahoutIT*"
+ dependencyConfigClosure = { setupMahoutIntegrationTestDependencyVersions(it) }
+ break
+ // skip integration-tests if the passed in profile-name is not valid
+ default: integrationTest.onlyIf { false }; return
+ }
+
+
+ filter { includeTestsMatching patternsToInclude }
+
+ // This is the standard way gradle allows overriding each specific dependency.
+ // see: http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html
+ project.configurations.all {
+ resolutionStrategy {
+ eachDependency {
+ dependencyConfigClosure(it)
+ }
+ }
+ }
}
dependencies {
- compile "org.kohsuke:graphviz-api:1.0"
- compile "org.apache.crunch:crunch-core:0.9.0-hadoop2"
- compile "com.jolbox:bonecp:${project.bonecpVersion}"
- compile "org.apache.derby:derby:${project.derbyVersion}"
- compile "com.google.guava:guava:${project.guavaVersion}"
- compile "commons-lang:commons-lang:2.6"
- compile "joda-time:joda-time:2.3"
- compile "org.apache.commons:commons-lang3:3.1"
- compile "com.google.protobuf:protobuf-java:2.5.0"
- compile "commons-logging:commons-logging:1.1.3"
- compile "com.thoughtworks.xstream:xstream:+"
- compile "org.apache.lucene:lucene-core:+"
- compile "org.apache.lucene:lucene-analyzers-common:+"
- compile "org.apache.solr:solr-commons-csv:3.5.0"
- compile "org.apache.hadoop:hadoop-client:${project.hadoopVersion}"
- compile group: "org.apache.pig", name: "pig", version: "0.12.0", classifier:"h2"
- compile "org.slf4j:slf4j-api:${project.slf4jVersion}"
- compile "log4j:log4j:1.2.12"
- compile "org.slf4j:slf4j-log4j12:${project.slf4jVersion}"
- compile "org.datanucleus:datanucleus-core:${project.datanucleusVersion}"
- compile "org.datanucleus:datanucleus-rdbms:${project.datanucleusJpaVersion}"
- compile "org.datanucleus:datanucleus-api-jdo:${project.datanucleusJpaVersion}"
- compile "org.datanucleus:datanucleus-accessplatform-jdo-rdbms:${project.datanucleusJpaVersion}"
- compile group: "org.apache.mrunit", name: "mrunit", version: "1.0.0", classifier:"hadoop2"
-
- compile 'org.scala-lang:scala-library:2.10.0'
-
- testCompile "junit:junit:4.11"
- testCompile "org.hamcrest:hamcrest-all:1.3"
- testCompile "org.scalatest:scalatest_2.10:2.1.7"
+ compile "org.kohsuke:graphviz-api:1.0"
+ compile "org.apache.crunch:crunch-core:0.9.0-hadoop2"
+ compile "com.jolbox:bonecp:${project.bonecpVersion}"
+ compile "org.apache.derby:derby:${project.derbyVersion}"
+ compile "com.google.guava:guava:${project.guavaVersion}"
+ compile "commons-lang:commons-lang:2.6"
+ compile "joda-time:joda-time:2.3"
+ compile "org.apache.commons:commons-lang3:3.1"
+ compile "com.google.protobuf:protobuf-java:2.5.0"
+ compile "commons-logging:commons-logging:1.1.3"
+ compile "com.thoughtworks.xstream:xstream:+"
+ compile "org.apache.lucene:lucene-core:+"
+ compile "org.apache.lucene:lucene-analyzers-common:+"
+ compile "org.apache.solr:solr-commons-csv:3.5.0"
+
+ compile group: "org.apache.pig", name: "pig", version: "0.12.0", classifier:"h2"
+ compile "org.slf4j:slf4j-api:${project.slf4jVersion}"
+ compile "log4j:log4j:1.2.12"
+ compile "org.slf4j:slf4j-log4j12:${project.slf4jVersion}"
+ compile "org.datanucleus:datanucleus-core:${project.datanucleusVersion}"
+ compile "org.datanucleus:datanucleus-rdbms:${project.datanucleusJpaVersion}"
+ compile "org.datanucleus:datanucleus-api-jdo:${project.datanucleusJpaVersion}"
+ compile "org.datanucleus:datanucleus-accessplatform-jdo-rdbms:${project.datanucleusJpaVersion}"
+ compile group: "org.apache.mrunit", name: "mrunit", version: "1.0.0", classifier:"hadoop2"
+
+ compile "org.jfairy:jfairy:0.2.4"
+
+ // from horton-works repo. They compile mahout-core against hadoop2.x
+ compile "org.apache.hadoop:hadoop-client:${hadoopVersion}"
+ compile "org.apache.mahout:mahout-core:${mahoutVersion}"
+
+ compile 'org.scala-lang:scala-library:2.11.0'
+
+ testCompile "junit:junit:4.11"
+ testCompile "org.hamcrest:hamcrest-all:1.3"
+ testCompile "org.scalatest:scalatest_2.11:2.1.7"
}
-eclipse {
- classpath {
- // Add the sependencies and the src dirs for the integrationTest source-set to the
- // .classpath file that will be generated by the eclipse plugin.
- plusConfigurations += configurations.integrationTestCompile
- // Uncomment the following two lines if you want to generate an eclipse project quickly.
- downloadSources = false
- downloadJavadoc = false
- }
+configurations {
+ /* hadoopClusterRuntime */ runtime {
+ // extendsFrom integrationTestRuntime
+ if(project.hasProperty('for-cluster')) {
+ excludeRules += [getGroup: { 'org.apache.crunch' }, getModule: { 'crunch-core' } ] as ExcludeRule
+ excludeRules += [getGroup: { 'org.apache.pig' }, getModule: { 'pig' } ] as ExcludeRule
+ excludeRules += [getGroup: { 'org.apache.mahout' }, getModule: { 'mahout-core' } ] as ExcludeRule
+ excludeRules += [getGroup: { 'org.apache.hadoop' }, getModule: { 'hadoop-client' } ] as ExcludeRule
+ }
+ }
}
+
+task listJars << {
+ configurations.shadow.each { println it.name }
+}
+
+def copyDependencyJarsForHadoopCluster() {
+ copy {
+ from configurations.hadoopClusterRuntime
+ into 'build/libs'
+ }
+}
+
+build {
+ doLast {
+ copyDependencyJarsForHadoopCluster()
+ }
+}
+
+eclipse {
+ classpath {
+ // Add the dependencies and the src dirs for the integrationTest source-set to the
+ // .classpath file that will be generated by the eclipse plugin.
+ plusConfigurations += [configurations.integrationTestCompile]
+ // Comment out the following two lines if you want to generate an eclipse project quickly.
+ downloadSources = true
+ downloadJavadoc = false
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java
new file mode 100644
index 0000000..b07c5a0
--- /dev/null
+++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore;
+
+import static org.apache.bigtop.bigpetstore.ITUtils.createTestOutputPath;
+import static org.apache.bigtop.bigpetstore.ITUtils.setup;
+
+import java.util.regex.Pattern;
+
+import org.apache.bigtop.bigpetstore.recommend.ItemRecommender;
+import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS.MahoutPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Predicate;
+
+public class BigPetStoreMahoutIT {
+
+ public static final Path INPUT_DIR_PATH =
+ new Path(ITUtils.BPS_TEST_PIG_CLEANED, MahoutPaths.Mahout.name());
+ public static final String INPUT_DIR_PATH_STR = INPUT_DIR_PATH.toString();
+ private static final Path MAHOUT_OUTPUT_DIR = createTestOutputPath(MahoutPaths.Mahout.name());
+ private static final Path ALS_FACTORIZATION_OUTPUT_DIR =
+ createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsFactorization.name());
+ private static final Path ALS_RECOMMENDATIONS_DIR =
+ createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsRecommendations.name());
+
+ private ItemRecommender itemRecommender;
+
+ @Before
+ public void setupTest() throws Throwable {
+ setup();
+ try {
+ FileSystem fs = FileSystem.get(new Configuration());
+ fs.delete(MAHOUT_OUTPUT_DIR, true);
+ itemRecommender = new ItemRecommender(INPUT_DIR_PATH_STR, ALS_FACTORIZATION_OUTPUT_DIR.toString(),
+ ALS_RECOMMENDATIONS_DIR.toString());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final Predicate<String> TEST_OUTPUT_FORMAT = new Predicate<String>() {
+ private final Pattern p = Pattern.compile("^\\d+\\s\\[\\d+:\\d+\\.\\d+\\]$");
+ @Override
+ public boolean apply(String input) {
+ return p.matcher(input).matches();
+ }
+ };
+
+ @Test
+ public void testPetStorePipeline() throws Exception {
+ itemRecommender.recommend();
+ ITUtils.assertOutput(ALS_RECOMMENDATIONS_DIR, TEST_OUTPUT_FORMAT);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
index 045a9cf..78d5c6b 100644
--- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
+++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
@@ -19,26 +19,22 @@ import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_GENERATED;
import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_PIG_CLEANED;
import static org.apache.bigtop.bigpetstore.ITUtils.fs;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.InputStreamReader;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.bigtop.bigpetstore.etl.PigCSVCleaner;
import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
/**
@@ -76,68 +72,24 @@ public class BigPetStorePigIT {
FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_CLEANED, true);
FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_COUNT_PRODUCTS, true);
} catch (Exception e) {
- System.out.println("didnt need to delete pig output.");
- // not necessarily an error
+ throw new RuntimeException(e);
}
}
- static Map<Path, Function<String, Boolean>> TESTS = ImmutableMap.of(
+ static Map<Path, Predicate<String>> TESTS = ImmutableMap.of(
/** Test of the main output */
- BPS_TEST_PIG_CLEANED, new Function<String, Boolean>() {
- public Boolean apply(String x) {
- // System.out.println("Verified...");
- return true;
- }
- },
- // Example of how to count products
- // after doing basic pig data cleanup
- BPS_TEST_PIG_COUNT_PRODUCTS, new Function<String, Boolean>() {
- // Jeff'
- public Boolean apply(String x) {
- return true;
- }
- }
+ BPS_TEST_PIG_CLEANED, ITUtils.VERIFICATION_PERDICATE,
+ // Example of how to count products after doing basic pig data cleanup
+ BPS_TEST_PIG_COUNT_PRODUCTS, ITUtils.VERIFICATION_PERDICATE,
+ // Test the output that is to be used as an input for Mahout.
+ BigPetStoreMahoutIT.INPUT_DIR_PATH, ITUtils.VERIFICATION_PERDICATE
);
- /**
- * The "core" task reformats data to TSV. lets test that first.
- */
@Test
public void testPetStoreCorePipeline() throws Exception {
runPig(BPS_TEST_GENERATED, BPS_TEST_PIG_CLEANED, PIG_SCRIPT);
- for (Entry<Path, Function<String, Boolean>> e : TESTS.entrySet()) {
- assertOutput(e.getKey(), e.getValue());
- }
- }
-
- public static void assertOutput(Path base,
- Function<String, Boolean> validator) throws Exception {
- FileSystem fs = FileSystem.getLocal(new Configuration());
-
- FileStatus[] files = fs.listStatus(base);
- // print out all the files.
- for (FileStatus stat : files) {
- System.out.println(stat.getPath() + " " + stat.getLen());
- }
-
- /**
- * Support map OR reduce outputs
- */
- Path partm = new Path(base, "part-m-00000");
- Path partr = new Path(base, "part-r-00000");
- Path p = fs.exists(partm) ? partm : partr;
-
- /**
- * Now we read through the file and validate its contents.
- */
- BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
-
- // line:{"product":"big chew toy","count":3}
- while (r.ready()) {
- String line = r.readLine();
- log.info("line:" + line);
- // System.out.println("line:"+line);
- Assert.assertTrue("validationg line : " + line, validator.apply(line));
+ for (Entry<Path, Predicate<String>> e : TESTS.entrySet()) {
+ ITUtils.assertOutput(e.getKey(), e.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
index df3b948..fd53dc1 100644
--- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
+++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
@@ -15,6 +15,8 @@
*/
package org.apache.bigtop.bigpetstore;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.List;
@@ -22,15 +24,26 @@ import java.util.List;
import org.apache.bigtop.bigpetstore.generator.BPSGenerator;
import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Predicate;
import com.google.common.io.Files;
public class ITUtils {
+ public static final Path TEST_OUTPUT_DIR = new Path("bps_integration_");
+
+ public static Predicate<String> VERIFICATION_PERDICATE = new Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return true;
+ }
+ };
static final Logger log = LoggerFactory.getLogger(ITUtils.class);
@@ -46,26 +59,25 @@ public class ITUtils {
msg += cp.replaceAll("hadoop", "**HADOOP**") + "\n";
}
}
- throw new RuntimeException("Major error: Probably issue. " + "Check hadoop version? " + e.getMessage()
- + " .... check these classpath elements:" + msg);
+ throw new RuntimeException("Major error: Probably issue. "
+ + "Check hadoop version? " + e.getMessage()
+ + " .... check these classpath elements:" + msg);
}
}
- public static final Path BPS_TEST_GENERATED = fs.makeQualified(new Path("bps_integration_",
- BigPetStoreConstants.OUTPUTS.generated.name()));
- public static final Path BPS_TEST_PIG_CLEANED = fs.makeQualified(new Path("bps_integration_",
- BigPetStoreConstants.OUTPUTS.cleaned.name()));
- public static final Path BPS_TEST_MAHOUT_IN = fs.makeQualified(new Path("bps_integration_",
- BigPetStoreConstants.OUTPUTS.MAHOUT_CF_IN.name()));
- public static final Path BPS_TEST_MAHOUT_OUT = fs.makeQualified(new Path("bps_integration_",
- BigPetStoreConstants.OUTPUTS.MAHOUT_CF_OUT.name()));
-
- public static void main(String[] args) {
+ public static final Path BPS_TEST_GENERATED =
+ createTestOutputPath(BigPetStoreConstants.OUTPUTS.generated.name());
+ public static final Path BPS_TEST_PIG_CLEANED =
+ createTestOutputPath (BigPetStoreConstants.OUTPUTS.cleaned.name());
+
+ public static Path createTestOutputPath(String... pathParts) {
+ Path path = TEST_OUTPUT_DIR;
+ for(String pathPart: pathParts) {
+ path = new Path(path, pathPart);
+ }
+ return path;
}
- // public static final Path CRUNCH_OUT = new
- // Path("bps_integration_",BigPetStoreConstants.OUTPUT_3).makeQualified(fs);
-
/**
* Some simple checks to make sure that unit tests in local FS. these arent
* designed to be run against a distribtued system.
@@ -99,29 +111,18 @@ public class ITUtils {
* test_data_directory/generated/part-r-00000
*/
public static void setup() throws Throwable {
- int records = 10;
- /**
- * Setup configuration with prop.
- */
Configuration conf = new Configuration();
- // debugging for jeff and others in local fs
- // that wont build
+ // debugging for Jeff and others in local fs that won't build
checkConf(conf);
- conf.setInt(BPSGenerator.props.bigpetstore_records.name(), records);
+ conf.setInt(BPSGenerator.props.bigpetstore_records.name(), BPSGenerator.DEFAULT_NUM_RECORDS);
- /**
- * Only create if doesnt exist already.....
- */
if (FileSystem.getLocal(conf).exists(BPS_TEST_GENERATED)) {
return;
}
- /**
- * Create the data set.
- */
- Job createInput = BPSGenerator.createJob(BPS_TEST_GENERATED, conf);
+ Job createInput = BPSGenerator.getCreateTransactionRecordsJob(BPS_TEST_GENERATED, conf);
createInput.waitForCompletion(true);
Path outputfile = new Path(BPS_TEST_GENERATED, "part-r-00000");
@@ -131,4 +132,37 @@ public class ITUtils {
System.out.println(l);
}
}
+
+
+ // A functions that logs the output file as a verification test
+ public static void assertOutput(Path base, Predicate<String> validator) throws Exception {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+
+ FileStatus[] files = fs.listStatus(base);
+ // print out all the files.
+ for (FileStatus stat : files) {
+ System.out.println(stat.getPath() + " " + stat.getLen());
+ }
+
+ /**
+ * Support map OR reduce outputs
+ */
+ Path partm = new Path(base, "part-m-00000");
+ Path partr = new Path(base, "part-r-00000");
+ Path p = fs.exists(partm) ? partm : partr;
+
+ /**
+ * Now we read through the file and validate its contents.
+ */
+ BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
+
+ // line:{"product":"big chew toy","count":3}
+ while (r.ready()) {
+ String line = r.readLine();
+ log.info("line:" + line);
+ // System.out.println("line:"+line);
+ Assert.assertTrue("validationg line : " + line, validator.apply(line));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
index 01ddd6e..0ca7444 100644
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,9 +21,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants;
+import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS;
import org.apache.bigtop.bigpetstore.util.DeveloperTools;
-import org.apache.bigtop.bigpetstore.util.NumericalIdUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,7 +32,7 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
/**
- * This class operates by ETL'ing the dataset into pig.
+ * This class operates by ETL'ing the data-set into pig.
* The pigServer is persisted through the life of the class, so that the
* intermediate data sets created in the constructor can be reused.
*/
@@ -41,11 +40,12 @@ public class PigCSVCleaner {
PigServer pigServer;
+ private static Path getCleanedTsvPath(Path outputPath) {
+ return new Path(outputPath, OUTPUTS.tsv.name());
+ }
+
public PigCSVCleaner(Path inputPath, Path outputPath, ExecType ex, File... scripts)
throws Exception {
-
-
-
FileSystem fs = FileSystem.get(inputPath.toUri(), new Configuration());
if(! fs.exists(inputPath)){
@@ -61,36 +61,29 @@ public class PigCSVCleaner {
/**
* First, split the tabs up.
*
- * BigPetStore,storeCode_OK,2 yang,jay,Mon Dec 15 23:33:49 EST
- * 1969,69.56,flea collar
- *
- * ("BigPetStore,storeCode_OK,2",
- * "yang,jay,Mon Dec 15 23:33:49 EST 1969,69.56,flea collar")
+ * BigPetStore,storeCode_OK,2 1,yang,jay,3,flea collar,69.56,Mon Dec 15 23:33:49 EST 1969
*
- * BigPetStore,storeCode_AK,1 amanda,fitzgerald,Sat Dec 20 09:44:25 EET
- * 1969,7.5,cat-food
+ * ("BigPetStore,storeCode_OK,2", "1,yang,jay,3,flea collar,69.56,Mon Dec 15 23:33:49 EST 1969")
*/
- pigServer.registerQuery("csvdata = LOAD '<i>' AS (ID,DETAILS);"
- .replaceAll("<i>", inputPath.toString()));
+ pigServer.registerQuery("csvdata = LOAD '<i>' AS (ID,DETAILS);".replaceAll("<i>", inputPath.toString()));
+ // currentCustomerId, firstName, lastName, product.id, product.name.toLowerCase, product.price, date
/**
- * Now, we want to split the two tab delimited feidls into uniform
+ * Now, we want to split the two tab delimited fields into uniform
* fields of comma separated values. To do this, we 1) Internally split
* the FIRST and SECOND fields by commas "a,b,c" --> (a,b,c) 2) FLATTEN
* the FIRST and SECOND fields. (d,e) (a,b,c) -> d e a b c
*/
- pigServer
- .registerQuery(
- "id_details = FOREACH csvdata GENERATE "
- + "FLATTEN" + "(STRSPLIT(ID,',',3)) AS " +
- "(drop, code, transaction) ,"
-
- + "FLATTEN" + "(STRSPLIT(DETAILS,',',5)) AS " +
- "(lname, fname, date, price," +
- "product:chararray);");
-
- pigServer.store("id_details", outputPath.toString());
-
+ pigServer.registerQuery(
+ "id_details = FOREACH csvdata GENERATE "
+ + "FLATTEN(STRSPLIT(ID, ',', 3)) AS " +
+ "(drop, code, transaction) ,"
+
+ + "FLATTEN(STRSPLIT(DETAILS, ',', 7)) AS " +
+ "(custId, fname, lname, productId, product:chararray, price, date);");
+ pigServer.registerQuery("mahout_records = FOREACH id_details GENERATE custId, productId, 1;");
+ pigServer.store("id_details", getCleanedTsvPath(outputPath).toString());
+ pigServer.store("mahout_records", new Path(outputPath, OUTPUTS.MahoutPaths.Mahout.name()).toString());
/**
* Now we run scripts... this is where you can add some
* arbitrary analytics.
@@ -102,18 +95,13 @@ public class PigCSVCleaner {
*/
int i = 0;
for(File script : scripts) {
- Map<String,String> parameters = new HashMap<String,String>();
- parameters.put("input",
- outputPath.toString());
+ Map<String,String> parameters = new HashMap<>();
+ parameters.put("input", getCleanedTsvPath(outputPath).toString());
Path dir = outputPath.getParent();
- Path adHocOut=
- new Path(
- dir,
- BigPetStoreConstants.OUTPUTS.pig_ad_hoc_script.name()+(i++));
+ Path adHocOut = new Path(dir, OUTPUTS.pig_ad_hoc_script.name() + (i++));
System.out.println("Setting default output to " + adHocOut);
parameters.put("output", adHocOut.toString());
-
pigServer.registerScript(script.getAbsolutePath(), parameters);
}
}
@@ -123,7 +111,7 @@ public class PigCSVCleaner {
for(int i = startIndex ; i < args.length ; i++) {
File f = new File(args[i]);
if(! f.exists()) {
- throw new RuntimeException("Pig script arg " + i+ " " + f.getAbsolutePath() + " not found. ");
+ throw new RuntimeException("Pig script arg " + i + " " + f.getAbsolutePath() + " not found. ");
}
files.add(f);
}
@@ -133,14 +121,11 @@ public class PigCSVCleaner {
"Each one will be given $input and $output arguments.");
return files.toArray(new File[]{});
}
+
public static void main(final String[] args) throws Exception {
System.out.println("Starting pig etl " + args.length);
-
Configuration c = new Configuration();
- int res = ToolRunner.run(
- c,
-
- new Tool() {
+ int res = ToolRunner.run(c, new Tool() {
Configuration conf;
@Override
public void setConf(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
index 3319064..6c8beef 100755
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.bigtop.bigpetstore.generator.PetStoreTransactionsInputFormat.props;
/**
* This is a mapreduce implementation of a generator of a large sentiment
@@ -46,71 +47,62 @@ import org.slf4j.LoggerFactory;
*/
public class BPSGenerator {
- final static Logger log = LoggerFactory.getLogger(BPSGenerator.class);
-
- public enum props {
- // bigpetstore_splits,
- bigpetstore_records
- }
+ public static final int DEFAULT_NUM_RECORDS = 100;
- public static Job createJob(Path output, int records) throws IOException {
- Configuration c = new Configuration();
- c.setInt(props.bigpetstore_records.name(), 10);
- return createJob(output, c);
- }
+ final static Logger log = LoggerFactory.getLogger(BPSGenerator.class);
- public static Job createJob(Path output, Configuration conf)
- throws IOException {
- Job job = new Job(conf, "PetStoreTransaction_ETL_"
- + System.currentTimeMillis());
- // recursively delete the data set if it exists.
- FileSystem.get(output.toUri(),conf).delete(output, true);
- job.setJarByClass(BPSGenerator.class);
- job.setMapperClass(MyMapper.class);
- // use the default reducer
- // job.setReducerClass(PetStoreTransactionGeneratorJob.Red.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- job.setInputFormatClass(GeneratePetStoreTransactionsInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileOutputFormat.setOutputPath(job, output);
- return job;
- }
+ public enum props {
+ bigpetstore_records
+ }
- public static class MyMapper extends Mapper<Text, Text, Text, Text> {
+ public static Job createJob(Path output, int records) throws IOException {
+ Configuration c = new Configuration();
+ c.setInt(props.bigpetstore_records.name(), DEFAULT_NUM_RECORDS);
+ return getCreateTransactionRecordsJob(output, c);
+ }
- @Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
- super.setup(context);
- }
+ public static Job getCreateTransactionRecordsJob(Path outputDir, Configuration conf)
+ throws IOException {
+ Job job = new Job(conf, "PetStoreTransaction_ETL_" + System.currentTimeMillis());
+ // recursively delete the data set if it exists.
+ FileSystem.get(outputDir.toUri(), conf).delete(outputDir, true);
+ job.setJarByClass(BPSGenerator.class);
+ job.setMapperClass(MyMapper.class);
+ // use the default reducer
+ // job.setReducerClass(PetStoreTransactionGeneratorJob.Red.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setInputFormatClass(PetStoreTransactionsInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ return job;
+ }
- protected void map(Text key, Text value, Context context)
- throws java.io.IOException, InterruptedException {
- context.write(key, value);
- // TODO: Add multiple outputs here which writes mock addresses for
- // generated users
- // to a corresponding data file.
- };
+ public static class MyMapper extends Mapper<Text, Text, Text, Text> {
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
}
- public static void main(String args[]) throws Exception {
- if (args.length != 2) {
- System.err.println("USAGE : [number of records] [output path]");
- System.exit(0);
- } else {
- Configuration conf = new Configuration();
- DeveloperTools.validate(
- args,
- "# of records",
- "output path");
+ protected void map(Text key, Text value, Context context)
+ throws java.io.IOException, InterruptedException {
+ context.write(key, value);
+ }
+ }
- conf.setInt(
- GeneratePetStoreTransactionsInputFormat.props.bigpetstore_records.name(),
- Integer.parseInt(args[0]));
- createJob(new Path(args[1]), conf).waitForCompletion(true);
- }
+ public static void main(String args[]) throws Exception {
+ if (args.length != 2) {
+ System.err.println("USAGE : [number of records] [output path]");
+ System.exit(0);
+ } else {
+ Configuration conf = new Configuration();
+ DeveloperTools.validate(args, "# of records", "output path");
+ conf.setInt(PetStoreTransactionsInputFormat.props.bigpetstore_records.name(),
+ Integer.parseInt(args[0]));
+ getCreateTransactionRecordsJob(new Path(args[1]), conf).waitForCompletion(true);
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
new file mode 100644
index 0000000..ef4ffb7
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
@@ -0,0 +1,80 @@
+package org.apache.bigtop.bigpetstore.generator
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.bigtop.bigpetstore.generator.util.State
+import org.apache.hadoop.fs.Path
+import parquet.org.codehaus.jackson.format.DataFormatDetector
+import org.slf4j.LoggerFactory
+import java.util.{Collection => JavaCollection}
+import scala.collection.JavaConversions.asJavaCollection
+import java.util.Random
+import scala.collection.mutable.{HashMap, Set, MultiMap}
+import scala.collection.immutable.NumericRange
+
+/**
+ * This class generates random customer data. The generated customer
+ * ids will be consecutive. The client code that generates the transactions
+ * records needs to know the available customer ids. If we keep the customer
+ * ids consecutive here. we don't have to store those ids in memory, or perform
+ * costly lookups. Once we introduce something that allows efficient lookup
+ * of data, we can do something else as well.
+ *
+ * The generated customer ids will start from 1. So, if we have 100 customers,
+ * the ids will be [1, 100].
+ */
+class CustomerGenerator(val desiredCustomerCount: Int, val outputPath: Path) {
+ private val logger = LoggerFactory.getLogger(getClass)
+ private val random = new Random;
+ private val assertion = "The generateCustomerRecords() hasn't been called yet";
+ private var customerFileGenerated = false
+ private val _stateToCustomerIds = new HashMap[State, NumericRange[Long]]
+
+ def isCustomerFileGenrated = customerFileGenerated
+
+ def customerIds(state: State) = {
+ assert(customerFileGenerated, assertion)
+ _stateToCustomerIds(state)
+ }
+
+ def generateCustomerRecords() = {
+ val config = new Configuration
+ val fs = FileSystem.getLocal(config)
+
+ assert(!fs.exists(outputPath))
+
+ val outputStream = fs.create(outputPath)
+
+ var currentId: Long = 1
+ logger.info("Generating customer records at: {}", fs.pathToFile(outputPath))
+ for (state <- State.values();
+ stateCustomerCount = (state.probability * desiredCustomerCount) toLong;
+ random = new Random(state.hashCode);
+ i <- 1L to stateCustomerCount) {
+ val customerRecord = CustomerGenerator.createRecord(currentId, state, random);
+ logger.info("generated customer: {}", customerRecord)
+ outputStream.writeBytes(customerRecord)
+
+ if(i == 1) {
+ val stateCustomerIdRange = currentId until (currentId + stateCustomerCount);
+ _stateToCustomerIds += (state -> stateCustomerIdRange)
+ }
+ currentId += 1
+ }
+
+ println(_stateToCustomerIds)
+ outputStream.flush
+ outputStream.close
+ customerFileGenerated = true
+ }
+}
+
+object CustomerGenerator {
+ val OUTPUT_FILE_NAME = "customers"
+
+ private def createRecord(id: Long, state: State, r: Random) = {
+ val firstName = DataForger.firstName
+ val lastName = DataForger.lastName
+ s"$id\t${DataForger.firstName(r)}\t${DataForger.lastName(r)}\t${state.name}\n"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java
deleted file mode 100755
index a779428..0000000
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bigtop.bigpetstore.generator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.KeyVal;
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * A simple input split that fakes input.
- */
-public class GeneratePetStoreTransactionsInputFormat extends
- FileInputFormat<Text, Text> {
-
- @Override
- public RecordReader<Text, Text> createRecordReader(
- final InputSplit inputSplit, TaskAttemptContext arg1)
- throws IOException, InterruptedException {
- return new RecordReader<Text, Text>() {
-
- @Override
- public void close() throws IOException {
-
- }
-
- /**
- * We need the "state" information to generate records. - Each state
- * has a probability associated with it, so that our data set can be
- * realistic (i.e. Colorado should have more transactions than rhode
- * island).
- *
- * - Each state also will its name as part of the key.
- *
- * - This task would be distributed, for example, into 50 nodes on a
- * real cluster, each creating the data for a given state.
- */
-
- // String storeCode = ((Split) inputSplit).storeCode;
- int records = ((PetStoreTransactionInputSplit) inputSplit).records;
- Iterator<KeyVal<String, String>> data = (new TransactionIteratorFactory(
- records, ((PetStoreTransactionInputSplit) inputSplit).state))
- .getData();
- KeyVal<String, String> currentRecord;
-
- @Override
- public Text getCurrentKey() throws IOException,
- InterruptedException {
- return new Text(currentRecord.key);
- }
-
- @Override
- public Text getCurrentValue() throws IOException,
- InterruptedException {
- return new Text(currentRecord.val);
- }
-
- @Override
- public void initialize(InputSplit arg0, TaskAttemptContext arg1)
- throws IOException, InterruptedException {
- }
-
- @Override
- public boolean nextKeyValue() throws IOException,
- InterruptedException {
- if (data.hasNext()) {
- currentRecord = data.next();
- return true;
- }
- return false;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0f;
- }
-
- };
- }
-
- public enum props {
- // bigpetstore_splits,
- bigpetstore_records
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext arg) throws IOException {
- int num_records_desired = arg
- .getConfiguration()
- .getInt(GeneratePetStoreTransactionsInputFormat.props.bigpetstore_records
- .name(), -1);
- if (num_records_desired == -1) {
- throw new RuntimeException(
- "# of total records not set in configuration object: "
- + arg.getConfiguration());
- }
-
- ArrayList<InputSplit> list = new ArrayList<InputSplit>();
-
- /**
- * Generator class will take a state as input and generate all the data
- * for that state.
- */
- for (TransactionIteratorFactory.STATE s : STATE.values()) {
- PetStoreTransactionInputSplit split = new PetStoreTransactionInputSplit(
- (int) (Math.ceil(num_records_desired * s.probability)), s);
- System.out.println(s + " _ " + split.records);
- list.add(split);
- }
- return list;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
index 9b32344..d350cc8 100755
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,7 +19,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE;
+import org.apache.bigtop.bigpetstore.generator.util.State;
+import org.apache.commons.lang3.Range;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -38,21 +39,26 @@ public class PetStoreTransactionInputSplit extends InputSplit implements
}
public int records;
- public STATE state;
+ public State state;
+ public Range<Long> customerIdRange;
- public PetStoreTransactionInputSplit(int records, STATE state) {
+ public PetStoreTransactionInputSplit(int records, Range<Long> customerIdRange, State state) {
this.records = records;
this.state = state;
+ this.customerIdRange = customerIdRange;
}
- public void readFields(DataInput arg0) throws IOException {
- records = arg0.readInt();
- state = STATE.valueOf(arg0.readUTF());
+ public void readFields(DataInput dataInputStream) throws IOException {
+ records = dataInputStream.readInt();
+ state = State.valueOf(dataInputStream.readUTF());
+ customerIdRange = Range.between(dataInputStream.readLong(), dataInputStream.readLong());
}
- public void write(DataOutput arg0) throws IOException {
- arg0.writeInt(records);
- arg0.writeUTF(state.name());
+ public void write(DataOutput dataOutputStream) throws IOException {
+ dataOutputStream.writeInt(records);
+ dataOutputStream.writeUTF(state.name());
+ dataOutputStream.writeLong(customerIdRange.getMinimum());
+ dataOutputStream.writeLong(customerIdRange.getMaximum());
}
@Override
@@ -62,6 +68,6 @@ public class PetStoreTransactionInputSplit extends InputSplit implements
@Override
public long getLength() throws IOException, InterruptedException {
- return 100;
+ return records;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
new file mode 100755
index 0000000..4c22e36
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.KeyVal;
+import org.apache.bigtop.bigpetstore.generator.util.State;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * A simple input split that fakes input.
+ */
+public class PetStoreTransactionsInputFormat extends
+ FileInputFormat<Text, Text> {
+
+ @Override
+ public RecordReader<Text, Text> createRecordReader(
+ final InputSplit inputSplit, TaskAttemptContext arg1)
+ throws IOException, InterruptedException {
+ return new RecordReader<Text, Text>() {
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ /**
+ * We need the "state" information to generate records. - Each state
+ * has a probability associated with it, so that our data set can be
+ * realistic (i.e. Colorado should have more transactions than rhode
+ * island).
+ *
+ * - Each state also will its name as part of the key.
+ *
+ * - This task would be distributed, for example, into 50 nodes on a
+ * real cluster, each creating the data for a given state.
+ */
+
+ PetStoreTransactionInputSplit bpsInputplit = (PetStoreTransactionInputSplit) inputSplit;
+ int records = bpsInputplit.records;
+ // TODO why not send the whole InputSplit there?
+ Iterator<KeyVal<String, String>> data =
+ (new TransactionIteratorFactory(records, bpsInputplit.customerIdRange, bpsInputplit.state)).data();
+ KeyVal<String, String> currentRecord;
+
+ @Override
+ public Text getCurrentKey() throws IOException,
+ InterruptedException {
+ return new Text(currentRecord.key());
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException,
+ InterruptedException {
+ return new Text(currentRecord.value());
+ }
+
+ @Override
+ public void initialize(InputSplit arg0, TaskAttemptContext arg1)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException,
+ InterruptedException {
+ if (data.hasNext()) {
+ currentRecord = data.next();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0f;
+ }
+
+ };
+ }
+
+ public enum props {
+ bigpetstore_records
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext arg) throws IOException {
+ int numRecordsDesired = arg
+ .getConfiguration()
+ .getInt(PetStoreTransactionsInputFormat.props.bigpetstore_records
+ .name(), -1);
+ if (numRecordsDesired == -1) {
+ throw new RuntimeException(
+ "# of total records not set in configuration object: "
+ + arg.getConfiguration());
+ }
+
+ List<InputSplit> list = new ArrayList<InputSplit>();
+ long customerIdStart = 1;
+ for (State s : State.values()) {
+ int numRecords = numRecords(numRecordsDesired, s.probability);
+ // each state is assigned a range of customer-ids from which it can choose.
+ // The number of customers can be as many as the number of transactions.
+ Range<Long> customerIdRange = Range.between(customerIdStart, customerIdStart + numRecords - 1);
+ PetStoreTransactionInputSplit split =
+ new PetStoreTransactionInputSplit(numRecords, customerIdRange, s);
+ System.out.println(s + " _ " + split.records);
+ list.add(split);
+ customerIdStart += numRecords;
+ }
+ return list;
+ }
+
+ private int numRecords(int numRecordsDesired, float probability) {
+ return (int) (Math.ceil(numRecordsDesired * probability));
+ }
+}
\ No newline at end of file