You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by ja...@apache.org on 2014/08/20 05:13:42 UTC

[1/2] BIGTOP-1272: Productionize the mahout recommender

Repository: bigtop
Updated Branches:
  refs/heads/master e9771e613 -> 4fca4573b


http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.java
deleted file mode 100755
index 0ea81ee..0000000
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bigtop.bigpetstore.generator;
-
-
-import java.util.Date;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.bigtop.bigpetstore.util.Pair;
-import org.apache.bigtop.bigpetstore.util.StringUtils;
-
-/**
- * This class generates our data. Over time we will use it to embed bias which
- * can then be teased out, i.e. by clutstering/classifiers. For example:
- *
- * certain products <--> certain years or days
- *
- *
- */
-public class TransactionIteratorFactory {
-
-    /**
-     * Each "state" has a pet store , with a certain "proportion" of the
-     * transactions. In this case colorado represents the majority of the
-     * transactions.
-     */
-
-    public static enum STATE {
-
-        // Each product is separated with an _ for its base price.
-        // That is just to make it easy to add new products.
-        // Each state is associated with a relative probability.
-        AZ(.1f, "dog-food_10", "cat-food_8", "leather-collar_25",
-                "snake-bite ointment_30", "turtle-food_11"),
-        AK(.1f,
-                "dog-food_10", "cat-food_8", "fuzzy-collar_19",
-                "antelope-caller_20", "salmon-bait_30"),
-        CT(.1f, "dog-food_10",
-                "cat-food_8", "fuzzy-collar_19", "turtle-pellets_5"),
-        OK(.1f,
-                "dog-food_10", "cat-food_8", "duck-caller_13",
-                "rodent-cage_40", "hay-bail_5", "cow-dung_2"),
-        CO(.1f,
-                "dog-food_10", "cat-food_8", "choke-collar_15",
-                "antelope snacks_30", "duck-caller_18"),
-        CA(.3f, "dog-food_10",
-                "cat-food_8", "fish-food_12", "organic-dog-food_16",
-                "turtle-pellets_5"),
-        NY(.2f, "dog-food_10", "cat-food_8", "steel-leash_20",
-                "fish-food_20", "seal-spray_25");
-
-        public static Random rand = new Random();
-        public float probability;
-        public String[] products;
-
-        private STATE(float probability, String... products) {
-            this.probability = probability;
-            this.products = products;
-        }
-
-        public Pair<String, Integer> randProduct() {
-            String product = products[rand.nextInt(products.length - 1)];
-            String name = StringUtils.substringBefore(product, "_");
-            Integer basePrice = Integer.parseInt(StringUtils.substringAfter(
-                    product, "_"));
-            return new Pair(name, basePrice);
-        }
-
-    }
-
-    public static class KeyVal<K, V> {
-
-        public final K key;
-        public final V val;
-
-        public KeyVal(K key, V val) {
-            this.key = key;
-            this.val = val;
-        }
-    }
-
-    private Iterator<KeyVal<String, String>> dataIterator;
-
-    Random r;
-
-    public TransactionIteratorFactory(final int records, final STATE state) {
-
-        /**
-         * Random is seeded by STATE. This way similar names will be randomly
-         * selected for states .
-         */
-        r = new Random(state.hashCode());
-
-        if (records == 0) {
-            throw new RuntimeException(
-                    "Cant create a data iterator with no records (records==0) !");
-        }
-
-        this.dataIterator = new Iterator<KeyVal<String, String>>() {
-            int trans_id = 1;
-
-            @Override
-            public boolean hasNext() {
-                // TODO Auto-generated method stub
-                return trans_id <= records;
-            }
-
-            int repeat = 0;
-            String fname = randFirstName();
-            String lname = randLastName();
-
-            @Override
-            public KeyVal<String, String> next() {
-                /**
-                 * Some customers come back for more :) We repeat a name up to
-                 * ten times.
-                 */
-                if (repeat > 0)
-                    repeat--;
-                else {
-                    fname = randFirstName();
-                    lname = randLastName();
-                    repeat = (int) (r.nextGaussian() * 10f);
-                }
-                String key, val;
-                key = join(",", "BigPetStore", "storeCode_" + state.name(),
-                        trans_id++ + "");
-                Pair<String, Integer> product_price = state.randProduct();
-                val = join(
-                        ",",
-                        fname,
-                        lname,
-                        getDate().toString(),
-                        fudgePrice(product_price.getFirst(),
-                                product_price.getSecond())
-                                + "", product_price.getFirst()); // products are
-                                                                 // biased by
-                                                                 // state
-
-                return new KeyVal<String, String>(key, val);
-            }
-
-            @Override
-            public void remove() {
-                // TODO Auto-generated method stub
-
-            }
-
-        };
-    }
-
-    /**
-     * Add some decimals to the price;
-     *
-     * @param i
-     * @return
-     */
-    public Float fudgePrice(String product, Integer i) {
-        float f = (float) i;
-        if (product.contains("dog")) {
-            return i + .50f;
-        }
-        if (product.contains("cat")) {
-            return i - .50f;
-        }
-        if (product.contains("fish")) {
-            return i - .25f;
-        } else
-            return i + .10f;
-    }
-
-    static String join(String sep, String... strs) {
-        if (strs.length == 0) {
-            return "";
-        } else if (strs.length == 1) {
-            return strs[0];
-        }
-        String temp = strs[0]; // inefficient ... should probably use
-                               // StringBuilder instead
-        for (int i = 1; i < strs.length; i++) {
-            temp += "," + strs[i];
-        }
-        return temp;
-    }
-
-    public Iterator<KeyVal<String, String>> getData() {
-        return this.dataIterator;
-    }
-
-    private String randFirstName() {
-        return FIRSTNAMES[this.r.nextInt(FIRSTNAMES.length - 1)].toLowerCase();
-    }
-
-    private String randLastName() {
-        return LASTNAMES[this.r.nextInt(LASTNAMES.length - 1)].toLowerCase();
-    }
-
-    private Date getDate() {
-        return new Date(this.r.nextInt());
-    }
-
-    private Integer getPrice() {
-        return this.r.nextInt(MAX_PRICE);
-    }
-
-    public static final Integer MINUTES_IN_DAY = 60 * 24;
-    public static final Integer MAX_PRICE = 10000;
-
-    private static String[] FIRSTNAMES = { "Aaron", "Abby", "Abigail", "Adam",
-            "Alan", "Albert", "Alex", "Alexandra", "Alexis", "Alice", "Alicia",
-            "Alisha", "Alissa", "Allen", "Allison", "Alyssa", "Amanda",
-            "Amber", "Amy", "Andrea", "Andrew", "Andy", "Angel", "Angela",
-            "Angie", "Anita", "Ann", "Anna", "Annette", "Anthony", "Antonio",
-            "April", "Arthur", "Ashley", "Audrey", "Austin", "Autumn", "Baby",
-            "Barb", "Barbara", "Becky", "Benjamin", "Beth", "Bethany", "Betty",
-            "Beverly", "Bill", "Billie", "Billy", "Blake", "Bob", "Bobbie",
-            "Bobby", "Bonnie", "Brad", "Bradley", "Brady", "Brandi", "Brandon",
-            "Brandy", "Breanna", "Brenda", "Brent", "Brett", "Brian",
-            "Brianna", "Brittany", "Brooke", "Brooklyn", "Bruce", "Bryan",
-            "Caleb", "Cameron", "Candy", "Carl", "Carla", "Carmen", "Carol",
-            "Carolyn", "Carrie", "Casey", "Cassandra", "Catherine", "Cathy",
-            "Chad", "Charlene", "Charles", "Charlie", "Charlotte", "Chase",
-            "Chasity", "Chastity", "Chelsea", "Cheryl", "Chester", "Cheyenne",
-            "Chris", "Christian", "Christina", "Christine", "Christoph",
-            "Christopher", "Christy", "Chuck", "Cindy", "Clara", "Clarence",
-            "Clayton", "Clifford", "Clint", "Cody", "Colton", "Connie",
-            "Corey", "Cory", "Courtney", "Craig", "Crystal", "Curtis",
-            "Cynthia", "Dakota", "Dale", "Dallas", "Dalton", "Dan", "Dana",
-            "Daniel", "Danielle", "Danny", "Darla", "Darlene", "Darrell",
-            "Darren", "Dave", "David", "Dawn", "Dean", "Deanna", "Debbie",
-            "Deborah", "Debra", "Denise", "Dennis", "Derek", "Derrick",
-            "Destiny", "Devin", "Diana", "Diane", "Dillon", "Dixie", "Dominic",
-            "Don", "Donald", "Donna", "Donnie", "Doris", "Dorothy", "Doug",
-            "Douglas", "Drew", "Duane", "Dustin", "Dusty", "Dylan", "Earl",
-            "Ed", "Eddie", "Edward", "Elaine", "Elizabeth", "Ellen", "Emily",
-            "Eric", "Erica", "Erika", "Erin", "Ernest", "Ethan", "Eugene",
-            "Eva", "Evelyn", "Everett", "Faith", "Father", "Felicia", "Floyd",
-            "Francis", "Frank", "Fred", "Gabriel", "Gage", "Gail", "Gary",
-            "Gene", "George", "Gerald", "Gina", "Ginger", "Glen", "Glenn",
-            "Gloria", "Grace", "Greg", "Gregory", "Haley", "Hannah", "Harley",
-            "Harold", "Harry", "Heath", "Heather", "Heidi", "Helen", "Herbert",
-            "Holly", "Hope", "Howard", "Hunter", "Ian", "Isaac", "Jack",
-            "Jackie", "Jacob", "Jade", "Jake", "James", "Jamie", "Jan", "Jane",
-            "Janet", "Janice", "Jared", "Jasmine", "Jason", "Jay", "Jean",
-            "Jeannie", "Jeff", "Jeffery", "Jeffrey", "Jenna", "Jennifer",
-            "Jenny", "Jeremiah", "Jeremy", "Jerry", "Jesse", "Jessica",
-            "Jessie", "Jill", "Jim", "Jimmy", "Joann", "Joanne", "Jodi",
-            "Jody", "Joe", "Joel", "Joey", "John", "Johnathan", "Johnny",
-            "Jon", "Jonathan", "Jonathon", "Jordan", "Joseph", "Josh",
-            "Joshua", "Joyce", "Juanita", "Judy", "Julia", "Julie", "Justin",
-            "Kaitlyn", "Karen", "Katelyn", "Katherine", "Kathleen", "Kathryn",
-            "Kathy", "Katie", "Katrina", "Kay", "Kayla", "Kaylee", "Keith",
-            "Kelly", "Kelsey", "Ken", "Kendra", "Kenneth", "Kenny", "Kevin",
-            "Kim", "Kimberly", "Kris", "Krista", "Kristen", "Kristin",
-            "Kristina", "Kristy", "Kyle", "Kylie", "Lacey", "Laken", "Lance",
-            "Larry", "Laura", "Lawrence", "Leah", "Lee", "Leonard", "Leroy",
-            "Leslie", "Levi", "Lewis", "Linda", "Lindsay", "Lindsey", "Lisa",
-            "Lloyd", "Logan", "Lois", "Loretta", "Lori", "Louis", "Lynn",
-            "Madison", "Mandy", "Marcus", "Margaret", "Maria", "Mariah",
-            "Marie", "Marilyn", "Marion", "Mark", "Marlene", "Marsha",
-            "Martha", "Martin", "Marty", "Marvin", "Mary", "Mary ann", "Mason",
-            "Matt", "Matthew", "Max", "Megan", "Melanie", "Melinda", "Melissa",
-            "Melody", "Michael", "Michelle", "Mickey", "Mike", "Mindy",
-            "Miranda", "Misty", "Mitchell", "Molly", "Monica", "Morgan",
-            "Mother", "Myron", "Nancy", "Natasha", "Nathan", "Nicholas",
-            "Nick", "Nicole", "Nina", "Noah", "Norma", "Norman", "Olivia",
-            "Paige", "Pam", "Pamela", "Pat", "Patricia", "Patrick", "Patty",
-            "Paul", "Paula", "Peggy", "Penny", "Pete", "Phillip", "Phyllis",
-            "Rachael", "Rachel", "Ralph", "Randall", "Randi", "Randy", "Ray",
-            "Raymond", "Rebecca", "Regina", "Renee", "Rex", "Rhonda",
-            "Richard", "Rick", "Ricky", "Rita", "Rob", "Robbie", "Robert",
-            "Roberta", "Robin", "Rochelle", "Rocky", "Rod", "Rodney", "Roger",
-            "Ron", "Ronald", "Ronda", "Ronnie", "Rose", "Roxanne", "Roy",
-            "Russ", "Russell", "Rusty", "Ruth", "Ryan", "Sabrina", "Sally",
-            "Sam", "Samantha", "Samuel", "Sandra", "Sandy", "Sara", "Sarah",
-            "Savannah", "Scott", "Sean", "Seth", "Shanda", "Shane", "Shanna",
-            "Shannon", "Sharon", "Shaun", "Shawn", "Shawna", "Sheila",
-            "Shelly", "Sher", "Sherri", "Sherry", "Shirley", "Sierra",
-            "Skyler", "Stacey", "Stacy", "Stanley", "Stephanie", "Stephen",
-            "Steve", "Steven", "Sue", "Summer", "Susan", "Sydney", "Tabatha",
-            "Tabitha", "Tamara", "Tammy", "Tara", "Tasha", "Tashia", "Taylor",
-            "Ted", "Teresa", "Terri", "Terry", "Tessa", "Thelma", "Theresa",
-            "Thomas", "Tia", "Tiffany", "Tim", "Timmy", "Timothy", "Tina",
-            "Todd", "Tom", "Tommy", "Toni", "Tony", "Tonya", "Tracey",
-            "Tracie", "Tracy", "Travis", "Trent", "Trevor", "Trey", "Trisha",
-            "Tristan", "Troy", "Tyler", "Tyrone", "Unborn", "Valerie",
-            "Vanessa", "Vernon", "Veronica", "Vicki", "Vickie", "Vicky",
-            "Victor", "Victoria", "Vincent", "Virginia", "Vivian", "Walter",
-            "Wanda", "Wayne", "Wendy", "Wesley", "Whitney", "William",
-            "Willie", "Wyatt", "Zachary" };
-
-    public static String[] LASTNAMES = { "Abbott", "Acevedo", "Acosta",
-            "Adams", "Adkins", "Aguilar", "Aguirre", "Albert", "Alexander",
-            "Alford", "Allen", "Allison", "Alston", "Alvarado", "Alvarez",
-            "Anderson", "Andrews", "Anthony", "Armstrong", "Arnold", "Ashley",
-            "Atkins", "Atkinson", "Austin", "Avery", "Avila", "Ayala", "Ayers",
-            "Bailey", "Baird", "Baker", "Baldwin", "Ball", "Ballard", "Banks",
-            "Barber", "Smith", "Johnson", "Williams", "Jones", "Brown",
-            "Davis", "Miller", "Wilson", "Moore", "Taylor", "Thomas",
-            "Jackson", "Barker", "Barlow", "Barnes", "Barnett", "Barr",
-            "Barrera", "Barrett", "Barron", "Barry", "Bartlett", "Barton",
-            "Bass", "Bates", "Battle", "Bauer", "Baxter", "Beach", "Bean",
-            "Beard", "Beasley", "Beck", "Becker", "Bell", "Bender", "Benjamin",
-            "Bennett", "Benson", "Bentley", "Benton", "Berg", "Berger",
-            "Bernard", "Berry", "Best", "Bird", "Bishop", "Black", "Blackburn",
-            "Blackwell", "Blair", "Blake", "Blanchard", "Blankenship",
-            "Blevins", "Bolton", "Bond", "Bonner", "Booker", "Boone", "Booth",
-            "Bowen", "Bowers", "Bowman", "Boyd", "Boyer", "Boyle", "Bradford",
-            "Bradley", "Bradshaw", "Brady", "Branch", "Bray", "Brennan",
-            "Brewer", "Bridges", "Briggs", "Bright", "Britt", "Brock",
-            "Brooks", "Browning", "Bruce", "Bryan", "Bryant", "Buchanan",
-            "Buck", "Buckley", "Buckner", "Bullock", "Burch", "Burgess",
-            "Burke", "Burks", "Burnett", "Burns", "Burris", "Burt", "Burton",
-            "Bush", "Butler", "Byers", "Byrd", "Cabrera", "Cain", "Calderon",
-            "Caldwell", "Calhoun", "Callahan", "Camacho", "Cameron",
-            "Campbell", "Campos", "Cannon", "Cantrell", "Cantu", "Cardenas",
-            "Carey", "Carlson", "Carney", "Carpenter", "Carr", "Carrillo",
-            "Carroll", "Carson", "Carter", "Carver", "Case", "Casey", "Cash",
-            "Castaneda", "Castillo", "Castro", "Cervantes", "Chambers", "Chan",
-            "Chandler", "Chaney", "Chang", "Chapman", "Charles", "Chase",
-            "Chavez", "Chen", "Cherry", "Christensen", "Christian", "Church",
-            "Clark", "Clarke", "Clay", "Clayton", "Clements", "Clemons",
-            "Cleveland", "Cline", "Cobb", "Cochran", "Coffey", "Cohen", "Cole",
-            "Coleman", "Collier", "Collins", "Colon", "Combs", "Compton",
-            "Conley", "Conner", "Conrad", "Contreras", "Conway", "Cook",
-            "Cooke", "Cooley", "Cooper", "Copeland", "Cortez", "Cote",
-            "Cotton", "Cox", "Craft", "Craig", "Crane", "Crawford", "Crosby",
-            "Cross", "Cruz", "Cummings", "Cunningham", "Curry", "Curtis",
-            "Dale", "Dalton", "Daniel", "Daniels", "Daugherty", "Davenport",
-            "David", "Davidson", "Dawson", "Day", "Dean", "Decker", "Dejesus",
-            "Delacruz", "Delaney", "Deleon", "Delgado", "Dennis", "Diaz",
-            "Dickerson", "Dickinson", "Dillard", "Dillon", "Dixon", "Dodson",
-            "Dominguez", "Donaldson", "Donovan", "Dorsey", "Dotson", "Douglas",
-            "Downs", "Doyle", "Drake", "Dudley", "Duffy", "Duke", "Duncan",
-            "Dunlap", "Dunn", "Duran", "Durham", "Dyer", "Eaton", "Edwards",
-            "Elliott", "Ellis", "Ellison", "Emerson", "England", "English",
-            "Erickson", "Espinoza", "Estes", "Estrada", "Evans", "Everett",
-            "Ewing", "Farley", "Farmer", "Farrell", "Faulkner", "Ferguson",
-            "Fernandez", "Ferrell", "Fields", "Figueroa", "Finch", "Finley",
-            "Fischer", "Fisher", "Fitzgerald", "Fitzpatrick", "Fleming",
-            "Fletcher", "Flores", "Flowers", "Floyd", "Flynn", "Foley",
-            "Forbes", "Ford", "Foreman", "Foster", "Fowler", "Fox", "Francis",
-            "Franco", "Frank", "Franklin", "Franks", "Frazier", "Frederick",
-            "Freeman", "French", "Frost", "Fry", "Frye", "Fuentes", "Fuller",
-            "Fulton", "Gaines", "Gallagher", "Gallegos", "Galloway", "Gamble",
-            "Garcia", "Gardner", "Garner", "Garrett", "Garrison", "Garza",
-            "Gates", "Gay", "Gentry", "George", "Gibbs", "Gibson", "Gilbert",
-            "Giles", "Gill", "Gillespie", "Gilliam", "Gilmore", "Glass",
-            "Glenn", "Glover", "Goff", "Golden", "Gomez", "Gonzales",
-            "Gonzalez", "Good", "Goodman", "Goodwin", "Gordon", "Gould",
-            "Graham", "Grant", "Graves", "Gray", "Green", "Greene", "Greer",
-            "Gregory", "Griffin", "Griffith", "Grimes", "Gross", "Guerra",
-            "Guerrero", "Guthrie", "Gutierrez", "Guy", "Guzman", "Hahn",
-            "Hale", "Haley", "Hall", "Hamilton", "Hammond", "Hampton",
-            "Hancock", "Haney", "Hansen", "Hanson", "Hardin", "Harding",
-            "Hardy", "Harmon", "Harper", "Harris", "Harrington", "Harrison",
-            "Hart", "Hartman", "Harvey", "Hatfield", "Hawkins", "Hayden",
-            "Hayes", "Haynes", "Hays", "Head", "Heath", "Hebert", "Henderson",
-            "Hendricks", "Hendrix", "Henry", "Hensley", "Henson", "Herman",
-            "Hernandez", "Herrera", "Herring", "Hess", "Hester", "Hewitt",
-            "Hickman", "Hicks", "Higgins", "Hill", "Hines", "Hinton", "Hobbs",
-            "Hodge", "Hodges", "Hoffman", "Hogan", "Holcomb", "Holden",
-            "Holder", "Holland", "Holloway", "Holman", "Holmes", "Holt",
-            "Hood", "Hooper", "Hoover", "Hopkins", "Hopper", "Horn", "Horne",
-            "Horton", "House", "Houston", "Howard", "Howe", "Howell",
-            "Hubbard", "Huber", "Hudson", "Huff", "Huffman", "Hughes", "Hull",
-            "Humphrey", "Hunt", "Hunter", "Hurley", "Hurst", "Hutchinson",
-            "Hyde", "Ingram", "Irwin", "Jacobs", "Jacobson", "James", "Jarvis",
-            "Jefferson", "Jenkins", "Jennings", "Jensen", "Jimenez", "Johns",
-            "Johnston", "Jordan", "Joseph", "Joyce", "Joyner", "Juarez",
-            "Justice", "Kane", "Kaufman", "Keith", "Keller", "Kelley", "Kelly",
-            "Kemp", "Kennedy", "Kent", "Kerr", "Key", "Kidd", "Kim", "King",
-            "Kinney", "Kirby", "Kirk", "Kirkland", "Klein", "Kline", "Knapp",
-            "Knight", "Knowles", "Knox", "Koch", "Kramer", "Lamb", "Lambert",
-            "Lancaster", "Landry", "Lane", "Lang", "Langley", "Lara", "Larsen",
-            "Larson", "Lawrence", "Lawson", "Le", "Leach", "Leblanc", "Lee",
-            "Leon", "Leonard", "Lester", "Levine", "Levy", "Lewis", "Lindsay",
-            "Lindsey", "Little", "Livingston", "Lloyd", "Logan", "Long",
-            "Lopez", "Lott", "Love", "Lowe", "Lowery", "Lucas", "Luna",
-            "Lynch", "Lynn", "Lyons", "Macdonald", "Macias", "Mack", "Madden",
-            "Maddox", "Maldonado", "Malone", "Mann", "Manning", "Marks",
-            "Marquez", "Marsh", "Marshall", "Martin", "Martinez", "Mason",
-            "Massey", "Mathews", "Mathis", "Matthews", "Maxwell", "May",
-            "Mayer", "Maynard", "Mayo", "Mays", "McBride", "McCall",
-            "McCarthy", "McCarty", "McClain", "McClure", "McConnell",
-            "McCormick", "McCoy", "McCray", "McCullough", "McDaniel",
-            "McDonald", "McDowell", "McFadden", "McFarland", "McGee",
-            "McGowan", "McGuire", "McIntosh", "McIntyre", "McKay", "McKee",
-            "McKenzie", "McKinney", "McKnight", "McLaughlin", "McLean",
-            "McLeod", "McMahon", "McMillan", "McNeil", "McPherson", "Meadows",
-            "Medina", "Mejia", "Melendez", "Melton", "Mendez", "Mendoza",
-            "Mercado", "Mercer", "Merrill", "Merritt", "Meyer", "Meyers",
-            "Michael", "Middleton", "Miles", "Mills", "Miranda", "Mitchell",
-            "Molina", "Monroe", "Montgomery", "Montoya", "Moody", "Moon",
-            "Mooney", "Morales", "Moran", "Moreno", "Morgan", "Morin",
-            "Morris", "Morrison", "Morrow", "Morse", "Morton", "Moses",
-            "Mosley", "Moss", "Mueller", "Mullen", "Mullins", "Munoz",
-            "Murphy", "Murray", "Myers", "Nash", "Navarro", "Neal", "Nelson",
-            "Newman", "Newton", "Nguyen", "Nichols", "Nicholson", "Nielsen",
-            "Nieves", "Nixon", "Noble", "Noel", "Nolan", "Norman", "Norris",
-            "Norton", "Nunez", "Obrien", "Ochoa", "Oconnor", "Odom",
-            "Odonnell", "Oliver", "Olsen", "Olson", "O'neal", "O'neil",
-            "O'neill", "Orr", "Ortega", "Ortiz", "Osborn", "Osborne", "Owen",
-            "Owens", "Pace", "Pacheco", "Padilla", "Page", "Palmer", "Park",
-            "Parker", "Parks", "Parrish", "Parsons", "Pate", "Patel",
-            "Patrick", "Patterson", "Patton", "Paul", "Payne", "Pearson",
-            "Peck", "Pena", "Pennington", "Perez", "Perkins", "Perry",
-            "Peters", "Petersen", "Peterson", "Petty", "Phelps", "Phillips",
-            "Pickett", "Pierce", "Pittman", "Pitts", "Pollard", "Poole",
-            "Pope", "Porter", "Potter", "Potts", "Powell", "Powers", "Pratt",
-            "Preston", "Price", "Prince", "Pruitt", "Puckett", "Pugh", "Quinn",
-            "Ramirez", "Ramos", "Ramsey", "Randall", "Randolph", "Rasmussen",
-            "Ratliff", "Ray", "Raymond", "Reed", "Reese", "Reeves", "Reid",
-            "Reilly", "Reyes", "Reynolds", "Rhodes", "Rice", "Rich", "Richard",
-            "Richards", "Richardson", "Richmond", "Riddle", "Riggs", "Riley",
-            "Rios", "Rivas", "Rivera", "Rivers", "Roach", "Robbins",
-            "Roberson", "Roberts", "Robertson", "Robinson", "Robles", "Rocha",
-            "Rodgers", "Rodriguez", "Rodriquez", "Rogers", "Rojas", "Rollins",
-            "Roman", "Romero", "Rosa", "Rosales", "Rosario", "Rose", "Ross",
-            "Roth", "Rowe", "Rowland", "Roy", "Ruiz", "Rush", "Russell",
-            "Russo", "Rutledge", "Ryan", "Salas", "Salazar", "Salinas",
-            "Sampson", "Sanchez", "Sanders", "Sandoval", "Sanford", "Santana",
-            "Santiago", "Santos", "Sargent", "Saunders", "Savage", "Sawyer",
-            "Schmidt", "Schneider", "Schroeder", "Schultz", "Schwartz",
-            "Scott", "Sears", "Sellers", "Serrano", "Sexton", "Shaffer",
-            "Shannon", "Sharp", "Sharpe", "Shaw", "Shelton", "Shepard",
-            "Shepherd", "Sheppard", "Sherman", "Shields", "Short", "Silva",
-            "Simmons", "Simon", "Simpson", "Sims", "Singleton", "Skinner",
-            "Slater", "Sloan", "Small", "Snider", "Snow", "Snyder", "Solis",
-            "Solomon", "Sosa", "Soto", "Sparks", "Spears", "Spence", "Spencer",
-            "Stafford", "Stanley", "Stanton", "Stark", "Steele", "Stein",
-            "Stephens", "Stephenson", "Stevens", "Stevenson", "Stewart",
-            "Stokes", "Stone", "Stout", "Strickland", "Strong", "Stuart",
-            "Suarez", "Sullivan", "Summers", "Sutton", "Swanson", "Sweeney",
-            "Sweet", "Sykes", "Talley", "Tanner", "Tate", "Terrell", "Terry",
-            "Thompson", "Thornton", "Tillman", "Todd", "Torres", "Townsend",
-            "Tran", "Travis", "Trevino", "Trujillo", "Tucker", "Turner",
-            "Tyler", "Tyson", "Underwood", "Valdez", "Valencia", "Valentine",
-            "Valenzuela", "Vance", "Vang", "Vargas", "Vasquez", "Vaughan",
-            "Vaughn", "Vazquez", "Vega", "Velasquez", "Velazquez", "Velez",
-            "Van halen", "Vincent", "Vinson", "Wade", "Wagner", "Walker",
-            "Wall", "Wallace", "Waller", "Walls", "Walsh", "Walter", "Walters",
-            "Walton", "Ward", "Ware", "Warner", "Warren", "Washington",
-            "Waters", "Watkins", "Watson", "Watts", "Weaver", "Webb", "Weber",
-            "Webster", "Weeks", "Weiss", "Welch", "Wells", "West", "Wheeler",
-            "Whitaker", "White", "Whitehead", "Whitfield", "Whitley",
-            "Whitney", "Wiggins", "Wilcox", "Wilder", "Wiley", "Wilkerson",
-            "Wilkins", "Wilkinson", "William", "Williamson", "Willis",
-            "Winters", "Wise", "Witt", "Wolf", "Wolfe", "Wong", "Wood",
-            "Woodard", "Woods", "Woodward", "Wooten", "Workman", "Wright",
-            "Wyatt", "Wynn", "Yang", "Yates", "York", "Young", "Zamora",
-            "Zimmerman"
-    };
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java
new file mode 100644
index 0000000..9ef3d67
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java
@@ -0,0 +1,63 @@
+package org.apache.bigtop.bigpetstore.generator.util;
+
+import java.math.BigDecimal;
+import static org.apache.bigtop.bigpetstore.generator.util.ProductType.*;
+
+public enum Product {
+  DOG_FOOD(DOG, 10.50),
+  ORGANIC_DOG_FOOD(DOG, 16.99),
+  STEEL_LEASH(DOG, 19.99),
+  FUZZY_COLLAR(DOG, 24.90),
+  LEATHER_COLLAR(DOG, 18.90),
+  CHOKE_COLLAR(DOG, 15.50),
+  DOG_HOUSE(DOG, 109.99),
+  CHEWY_BONE(DOG, 20.10),
+  DOG_VEST(DOG, 19.99),
+  DOG_SOAP(DOG, 5.45),
+
+  CAT_FOOD(CAT, 7.50),
+  FEEDER_BOWL(CAT, 10.99),
+  LITTER_BOX(CAT, 24.95),
+  CAT_COLLAR(CAT, 7.95),
+  CAT_BLANKET(CAT, 14.49),
+
+  TURTLE_PELLETS(TURTLE, 4.95),
+  TURTLE_FOOD(TURTLE, 10.90),
+  TURTLE_TUB(TURTLE, 40.45),
+
+  FISH_FOOD(FISH, 12.50),
+  SALMON_BAIT(FISH, 29.95),
+  FISH_BOWL(FISH, 20.99),
+  AIR_PUMP(FISH, 13.95),
+  FILTER(FISH, 34.95),
+
+  DUCK_COLLAR(DUCK, 13.25),
+  DUCK_FOOD(DUCK, 20.25),
+  WADING_POOL(DUCK, 45.90);
+
+  /*
+  ANTELOPE_COLLAR(OTHER, 19.90),
+  ANTELOPE_SNACKS(OTHER, 29.25),
+  RODENT_CAGE(OTHER, 39.95),
+  HAY_BALE(OTHER, 4.95),
+  COW_DUNG(OTHER, 1.95),
+  SEAL_SPRAY(OTHER, 24.50),
+  SNAKE_BITE_OINTMENT(OTHER, 29.90);
+  */
+  private final BigDecimal price;
+  public final ProductType productType;
+  private Product(ProductType productType, double price) {
+    this.price = BigDecimal.valueOf(price);
+    this.productType = productType;
+  }
+
+  public int id() {
+    return this.ordinal();
+  }
+
+  public BigDecimal price() {
+    return this.price;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java
new file mode 100644
index 0000000..f41b604
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java
@@ -0,0 +1,29 @@
+package org.apache.bigtop.bigpetstore.generator.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public enum ProductType {
+  DOG, CAT, TURTLE, FISH, DUCK;
+
+  private List<Product> products;
+
+  public List<Product> getProducts() {
+    if(products == null) {
+      generateProductList();
+    }
+    return products;
+  }
+
+  private void generateProductList() {
+    List<Product> products = new ArrayList<>();
+    for(Product p : Product.values()) {
+      if(p.productType == this) {
+        products.add(p);
+      }
+    }
+    this.products = Collections.unmodifiableList(products);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java
new file mode 100644
index 0000000..f2b845a
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java
@@ -0,0 +1,26 @@
+package org.apache.bigtop.bigpetstore.generator.util;
+
+import java.util.Random;
+
+
+/**
+ * Each "state" has a pet store , with a certain "proportion" of the
+ * transactions.
+ */
+public enum State {
+  // Each state is associated with a relative probability.
+  AZ(.1f),
+  AK(.1f),
+  CT(.1f),
+  OK(.1f),
+  CO(.1f),
+  CA(.3f),
+  NY(.2f);
+
+  public static Random rand = new Random();
+  public float probability;
+
+  private State(float probability) {
+    this.probability = probability;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala
new file mode 100644
index 0000000..0ec240e
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala
@@ -0,0 +1,103 @@
+package org.apache.bigtop.bigpetstore.recommend
+
+import org.apache.mahout.cf.taste.hadoop.als.RecommenderJob
+import org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob
+import java.io.File
+import parquet.org.codehaus.jackson.map.DeserializerFactory.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.util.ToolRunner
+import org.apache.mahout.cf.taste.hadoop.als.SharingMapper
+import org.apache.hadoop.util.Tool
+import org.apache.bigtop.bigpetstore.util.DeveloperTools
+
+// We don't need to wrap these two jobs in ToolRunner.run calls since the only
+// thing that we are doing right now is calling the run() methods of RecommenderJob
+// and ParallelALSFactorizationJob. Both of these classes have a main() method that
+// internally calls ToolRunner.run with all the command line args passed. So, if
+// we want to run this code from the command line, we can easily do so by running
+// the main methods of the ParallelALSFactorizationJob, followed by running the
+// main method of RecommenderJob. That would also take care of the multiple-jvm
+// instance issue metioned in the comments below, so the call to
+class ItemRecommender(private val inputDir: String,
+        private val factorizationOutputDir: String,
+        private val recommendationsOutputDir: String) {
+  private val recommenderJob = new RecommenderJob
+  private val factorizationJob = new ParallelALSFactorizationJob
+
+  private def tempDir = "/tmp/mahout_" + System.currentTimeMillis
+
+  private def performAlsFactorization() = {
+    ToolRunner.run(factorizationJob, Array(
+        "--input", inputDir,
+        "--output", factorizationOutputDir,
+        "--lambda", "0.1",
+        "--tempDir", tempDir,
+        "--implicitFeedback", "false",
+        "--alpha", "0.8",
+        "--numFeatures", "2",
+        "--numIterations", "5",
+        "--numThreadsPerSolver", "1"))
+  }
+
+  private def generateRecommendations() = {
+    ToolRunner.run(recommenderJob, (Array(
+        "--input", factorizationOutputDir + "/userRatings/",
+        "--userFeatures", factorizationOutputDir + "/U/",
+        "--itemFeatures", factorizationOutputDir + "/M/",
+        "--numRecommendations", "1",
+        "--output", recommendationsOutputDir,
+        "--maxRating", "1")))
+  }
+
+  // At this point, the performAlsFactorization generateRecommendations
+  // and this method can not be run from the same VM instance. These two jobs
+  // share a common static variable which is not being handled correctly.
+  // This, unfortunately, results in a class-cast exception being thrown. That's
+  // why the resetFlagInSharedAlsMapper is required. See the comments on
+  // resetFlagInSharedAlsMapper() method.
+  def recommend = {
+    performAlsFactorization
+    resetFlagInSharedAlsMapper
+    generateRecommendations
+  }
+
+  // necessary for local execution in the same JVM only. If the performAlsFactorization()
+  // and generateRecommendations() calls are performed in separate JVM instances, this
+  // would be taken care of automatically. However, if we want to run this two methods
+  // as one task, we need to clean up the static state set by these methods, and we don't
+  // have any legitimate way of doing this directly. This clean-up should have been
+  // performed by ParallelALSFactorizationJob class after the job is finished.
+  // TODO: remove this when a better way comes along, or ParallelALSFactorizationJob
+  // takes responsibility.
+  private def resetFlagInSharedAlsMapper {
+    val m = classOf[SharingMapper[_, _, _, _, _]].getDeclaredMethod("reset");
+    m setAccessible true
+    m.invoke(null)
+  }
+}
+
+object ItemRecommender {
+  def main(args: Array[String]) {
+      val res = ToolRunner.run(new Configuration(), new Tool() {
+      var conf: Configuration = _;
+
+      override def setConf(conf: Configuration) {
+        this.conf=conf;
+      }
+
+
+      override def getConf() = {
+        this.conf;
+      }
+
+
+      override def run(toolArgs: Array[String]) = {
+        val ir = new ItemRecommender(toolArgs(0), toolArgs(1), toolArgs(2))
+        ir.recommend
+        0;
+      }
+    }, args);
+    System.exit(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
index 29f7c67..01a6b95 100755
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * 
+ *
  * Static final constants
  *
  * is useful to have the basic sql here as the HIVE SQL can vary between hive
@@ -24,13 +24,18 @@ package org.apache.bigtop.bigpetstore.util;
 public class BigPetStoreConstants {
 
    //Files should be stored in graphviz arch.dot
-   public enum OUTPUTS{
+   public static enum OUTPUTS {
         generated,//generator
         cleaned,//pig
+        tsv,
         pig_ad_hoc_script,
-        MAHOUT_CF_IN,//hive view over data for mahout
-        MAHOUT_CF_OUT,//mahout cf results
-        CUSTOMER_PAGE//crunchhh
+        CUSTOMER_PAGE; //crunchhh
+
+        public static enum MahoutPaths {
+          Mahout,
+          AlsFactorization,
+          AlsRecommendations
+        }
     };
 
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
index 9fa9455..c652beb 100644
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,16 +16,14 @@
 
 package org.apache.bigtop.bigpetstore.util;
 
-import java.math.BigInteger;
-
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE;
+import org.apache.bigtop.bigpetstore.generator.util.State;
 
 /**
  * User and Product IDs need numerical
  * identifiers for recommender algorithms
  * which attempt to interpolate new
  * products.
- * 
+ *
  * TODO: Delete this class. Its not necessarily required: We might just use HIVE HASH() as our
  * standard for this.
  */
@@ -34,7 +32,7 @@ public class NumericalIdUtils {
     /**
      * People: Leading with ordinal code for state.
      */
-    public static long toId(STATE state, String name){
+    public static long toId(State state, String name){
         String fromRawData =
                 state==null?
                         name:

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/Pair.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/Pair.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/Pair.java
deleted file mode 100644
index a96fa44..0000000
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/util/Pair.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bigtop.bigpetstore.util;
-
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory;
-
-import java.util.Comparator;
-
-@Deprecated
-public class Pair<S, T> implements Comparable<Pair<S, T>> {
-
-  private final S first;
-  private final T second;
-
-  public Pair(final S car, final T cdr) {
-    first = car;
-    second = cdr;
-  }
-
-  public S getFirst() { return first; }
-  public T getSecond() { return second; }
-
-  @Override
-  public boolean equals(Object o) {
-    if (null == o) {
-      return false;
-    } else if (o instanceof Pair) {
-      Pair<S, T> p = (Pair<S, T>) o;
-      if (first == null && second == null) {
-        return p.first == null && p.second == null;
-      } else if (first == null) {
-        return p.first == null && second.equals(p.second);
-      } else if (second == null) {
-        return p.second == null && first.equals(p.first);
-      } else {
-        return first.equals(p.first) && second.equals(p.second);
-      }
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    int code = 0;
-
-    if (null != first) {
-      code += first.hashCode();
-    }
-
-    if (null != second) {
-      code += second.hashCode() << 1;
-    }
-
-    return code;
-  }
-
-  @Override
-  public int compareTo(Pair<S, T> p) {
-    if (null == p) {
-      return 1;
-    }
-
-    Comparable<S> firstCompare = (Comparable<S>) first;
-
-    int firstResult = firstCompare.compareTo(p.first);
-    if (firstResult == 0) {
-      Comparable<T> secondCompare = (Comparable<T>) second;
-      return secondCompare.compareTo(p.second);
-    } else {
-      return firstResult;
-    }
-  }
-
-  // TODO: Can this be made static? Same with SecondElemComparator?
-  public class FirstElemComparator implements Comparator<Pair<S, T>> {
-    public FirstElemComparator() {
-    }
-
-    public int compare(Pair<S, T> p1, Pair<S, T> p2) {
-      Comparable<S> cS = (Comparable<S>) p1.first;
-      return cS.compareTo(p2.first);
-    }
-  }
-
-  public class SecondElemComparator implements Comparator<Pair<S, T>> {
-    public SecondElemComparator() {
-    }
-
-    public int compare(Pair<S, T> p1, Pair<S, T> p2) {
-      Comparable<T> cT = (Comparable<T>) p1.second;
-      return cT.compareTo(p2.second);
-    }
-  }
-
-  @Override
-  public String toString() {
-    String firstString = "null";
-    String secondString = "null";
-
-    if (null != first) {
-      firstString = first.toString();
-    }
-
-    if (null != second) {
-      secondString = second.toString();
-    }
-
-    return "(" + firstString + ", " + secondString + ")";
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala b/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala
new file mode 100644
index 0000000..de9b29b
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala
@@ -0,0 +1,263 @@
+package org.apache.bigtop.bigpetstore.generator
+
+import java.util.Random
+import org.jfairy.Fairy
+import java.util.Date
+
+
+/**
+ * Generic class for generating random data. This class was created so
+ * that we can provide a uniform API for getting random data. If we want,
+ * we can replace the underlying data-generation implementation using
+ * existing libraries.
+ */
+object DataForger {
+  private val random = new Random
+  private val fairy = Fairy.create()
+
+  // TODO: Jay / Bhashit : refactor to use a random data generator?
+  def firstName(random: Random) = firstNames(random.nextInt(firstNames.length))
+  def firstName: String = firstName(random)
+
+  // TODO: Jay / Bhashit : refactor to use a random data generator?
+  def lastName(random: Random) = lastNames(random.nextInt(lastNames.length))
+  def lastName: String = lastName(random)
+
+  def randomDateInPastYears(maxYearsEarlier: Int) = fairy.dateProducer().randomDateInThePast(maxYearsEarlier).toDate()
+
+  private val firstNames =  IndexedSeq("Aaron", "Abby", "Abigail", "Adam",
+          "Alan", "Albert", "Alex", "Alexandra", "Alexis", "Alice", "Alicia",
+          "Alisha", "Alissa", "Allen", "Allison", "Alyssa", "Amanda", "Amber",
+          "Amy", "Andrea", "Andrew", "Andy", "Angel", "Angela", "Angie",
+          "Anita", "Ann", "Anna", "Annette", "Anthony", "Antonio", "April",
+          "Arthur", "Ashley", "Audrey", "Austin", "Autumn", "Baby", "Barb",
+          "Barbara", "Becky", "Benjamin", "Beth", "Bethany", "Betty",
+          "Beverly", "Bill", "Billie", "Billy", "Blake", "Bob", "Bobbie",
+          "Bobby", "Bonnie", "Brad", "Bradley", "Brady", "Brandi", "Brandon",
+          "Brandy", "Breanna", "Brenda", "Brent", "Brett", "Brian", "Brianna",
+          "Brittany", "Brooke", "Brooklyn", "Bruce", "Bryan", "Caleb",
+          "Cameron", "Candy", "Carl", "Carla", "Carmen", "Carol", "Carolyn",
+          "Carrie", "Casey", "Cassandra", "Catherine", "Cathy", "Chad",
+          "Charlene", "Charles", "Charlie", "Charlotte", "Chase", "Chasity",
+          "Chastity", "Chelsea", "Cheryl", "Chester", "Cheyenne", "Chris",
+          "Christian", "Christina", "Christine", "Christoph", "Christopher",
+          "Christy", "Chuck", "Cindy", "Clara", "Clarence", "Clayton",
+          "Clifford", "Clint", "Cody", "Colton", "Connie", "Corey", "Cory",
+          "Courtney", "Craig", "Crystal", "Curtis", "Cynthia", "Dakota",
+          "Dale", "Dallas", "Dalton", "Dan", "Dana", "Daniel", "Danielle",
+          "Danny", "Darla", "Darlene", "Darrell", "Darren", "Dave", "David",
+          "Dawn", "Dean", "Deanna", "Debbie", "Deborah", "Debra", "Denise",
+          "Dennis", "Derek", "Derrick", "Destiny", "Devin", "Diana", "Diane",
+          "Dillon", "Dixie", "Dominic", "Don", "Donald", "Donna", "Donnie",
+          "Doris", "Dorothy", "Doug", "Douglas", "Drew", "Duane", "Dustin",
+          "Dusty", "Dylan", "Earl", "Ed", "Eddie", "Edward", "Elaine",
+          "Elizabeth", "Ellen", "Emily", "Eric", "Erica", "Erika", "Erin",
+          "Ernest", "Ethan", "Eugene", "Eva", "Evelyn", "Everett", "Faith",
+          "Father", "Felicia", "Floyd", "Francis", "Frank", "Fred", "Gabriel",
+          "Gage", "Gail", "Gary", "Gene", "George", "Gerald", "Gina", "Ginger",
+          "Glen", "Glenn", "Gloria", "Grace", "Greg", "Gregory", "Haley",
+          "Hannah", "Harley", "Harold", "Harry", "Heath", "Heather", "Heidi",
+          "Helen", "Herbert", "Holly", "Hope", "Howard", "Hunter", "Ian",
+          "Isaac", "Jack", "Jackie", "Jacob", "Jade", "Jake", "James", "Jamie",
+          "Jan", "Jane", "Janet", "Janice", "Jared", "Jasmine", "Jason", "Jay",
+          "Jean", "Jeannie", "Jeff", "Jeffery", "Jeffrey", "Jenna", "Jennifer",
+          "Jenny", "Jeremiah", "Jeremy", "Jerry", "Jesse", "Jessica", "Jessie",
+          "Jill", "Jim", "Jimmy", "Joann", "Joanne", "Jodi", "Jody", "Joe",
+          "Joel", "Joey", "John", "Johnathan", "Johnny", "Jon", "Jonathan",
+          "Jonathon", "Jordan", "Joseph", "Josh", "Joshua", "Joyce", "Juanita",
+          "Judy", "Julia", "Julie", "Justin", "Kaitlyn", "Karen", "Katelyn",
+          "Katherine", "Kathleen", "Kathryn", "Kathy", "Katie", "Katrina",
+          "Kay", "Kayla", "Kaylee", "Keith", "Kelly", "Kelsey", "Ken",
+          "Kendra", "Kenneth", "Kenny", "Kevin", "Kim", "Kimberly", "Kris",
+          "Krista", "Kristen", "Kristin", "Kristina", "Kristy", "Kyle",
+          "Kylie", "Lacey", "Laken", "Lance", "Larry", "Laura", "Lawrence",
+          "Leah", "Lee", "Leonard", "Leroy", "Leslie", "Levi", "Lewis",
+          "Linda", "Lindsay", "Lindsey", "Lisa", "Lloyd", "Logan", "Lois",
+          "Loretta", "Lori", "Louis", "Lynn", "Madison", "Mandy", "Marcus",
+          "Margaret", "Maria", "Mariah", "Marie", "Marilyn", "Marion", "Mark",
+          "Marlene", "Marsha", "Martha", "Martin", "Marty", "Marvin", "Mary",
+          "Mary ann", "Mason", "Matt", "Matthew", "Max", "Megan", "Melanie",
+          "Melinda", "Melissa", "Melody", "Michael", "Michelle", "Mickey",
+          "Mike", "Mindy", "Miranda", "Misty", "Mitchell", "Molly", "Monica",
+          "Morgan", "Mother", "Myron", "Nancy", "Natasha", "Nathan",
+          "Nicholas", "Nick", "Nicole", "Nina", "Noah", "Norma", "Norman",
+          "Olivia", "Paige", "Pam", "Pamela", "Pat", "Patricia", "Patrick",
+          "Patty", "Paul", "Paula", "Peggy", "Penny", "Pete", "Phillip",
+          "Phyllis", "Rachael", "Rachel", "Ralph", "Randall", "Randi", "Randy",
+          "Ray", "Raymond", "Rebecca", "Regina", "Renee", "Rex", "Rhonda",
+          "Richard", "Rick", "Ricky", "Rita", "Rob", "Robbie", "Robert",
+          "Roberta", "Robin", "Rochelle", "Rocky", "Rod", "Rodney", "Roger",
+          "Ron", "Ronald", "Ronda", "Ronnie", "Rose", "Roxanne", "Roy", "Russ",
+          "Russell", "Rusty", "Ruth", "Ryan", "Sabrina", "Sally", "Sam",
+          "Samantha", "Samuel", "Sandra", "Sandy", "Sara", "Sarah", "Savannah",
+          "Scott", "Sean", "Seth", "Shanda", "Shane", "Shanna", "Shannon",
+          "Sharon", "Shaun", "Shawn", "Shawna", "Sheila", "Shelly", "Sher",
+          "Sherri", "Sherry", "Shirley", "Sierra", "Skyler", "Stacey", "Stacy",
+          "Stanley", "Stephanie", "Stephen", "Steve", "Steven", "Sue",
+          "Summer", "Susan", "Sydney", "Tabatha", "Tabitha", "Tamara", "Tammy",
+          "Tara", "Tasha", "Tashia", "Taylor", "Ted", "Teresa", "Terri",
+          "Terry", "Tessa", "Thelma", "Theresa", "Thomas", "Tia", "Tiffany",
+          "Tim", "Timmy", "Timothy", "Tina", "Todd", "Tom", "Tommy", "Toni",
+          "Tony", "Tonya", "Tracey", "Tracie", "Tracy", "Travis", "Trent",
+          "Trevor", "Trey", "Trisha", "Tristan", "Troy", "Tyler", "Tyrone",
+          "Unborn", "Valerie", "Vanessa", "Vernon", "Veronica", "Vicki",
+          "Vickie", "Vicky", "Victor", "Victoria", "Vincent", "Virginia",
+          "Vivian", "Walter", "Wanda", "Wayne", "Wendy", "Wesley", "Whitney",
+          "William", "Willie", "Wyatt", "Zachary")
+
+  private val lastNames = IndexedSeq("Abbott", "Acevedo", "Acosta", "Adams",
+          "Adkins", "Aguilar", "Aguirre", "Albert", "Alexander", "Alford",
+          "Allen", "Allison", "Alston", "Alvarado", "Alvarez", "Anderson",
+          "Andrews", "Anthony", "Armstrong", "Arnold", "Ashley", "Atkins",
+          "Atkinson", "Austin", "Avery", "Avila", "Ayala", "Ayers", "Bailey",
+          "Baird", "Baker", "Baldwin", "Ball", "Ballard", "Banks", "Barber",
+          "Smith", "Johnson", "Williams", "Jones", "Brown", "Davis", "Miller",
+          "Wilson", "Moore", "Taylor", "Thomas", "Jackson", "Barker", "Barlow",
+          "Barnes", "Barnett", "Barr", "Barrera", "Barrett", "Barron", "Barry",
+          "Bartlett", "Barton", "Bass", "Bates", "Battle", "Bauer", "Baxter",
+          "Beach", "Bean", "Beard", "Beasley", "Beck", "Becker", "Bell",
+          "Bender", "Benjamin", "Bennett", "Benson", "Bentley", "Benton",
+          "Berg", "Berger", "Bernard", "Berry", "Best", "Bird", "Bishop",
+          "Black", "Blackburn", "Blackwell", "Blair", "Blake", "Blanchard",
+          "Blankenship", "Blevins", "Bolton", "Bond", "Bonner", "Booker",
+          "Boone", "Booth", "Bowen", "Bowers", "Bowman", "Boyd", "Boyer",
+          "Boyle", "Bradford", "Bradley", "Bradshaw", "Brady", "Branch",
+          "Bray", "Brennan", "Brewer", "Bridges", "Briggs", "Bright", "Britt",
+          "Brock", "Brooks", "Browning", "Bruce", "Bryan", "Bryant",
+          "Buchanan", "Buck", "Buckley", "Buckner", "Bullock", "Burch",
+          "Burgess", "Burke", "Burks", "Burnett", "Burns", "Burris", "Burt",
+          "Burton", "Bush", "Butler", "Byers", "Byrd", "Cabrera", "Cain",
+          "Calderon", "Caldwell", "Calhoun", "Callahan", "Camacho", "Cameron",
+          "Campbell", "Campos", "Cannon", "Cantrell", "Cantu", "Cardenas",
+          "Carey", "Carlson", "Carney", "Carpenter", "Carr", "Carrillo",
+          "Carroll", "Carson", "Carter", "Carver", "Case", "Casey", "Cash",
+          "Castaneda", "Castillo", "Castro", "Cervantes", "Chambers", "Chan",
+          "Chandler", "Chaney", "Chang", "Chapman", "Charles", "Chase",
+          "Chavez", "Chen", "Cherry", "Christensen", "Christian", "Church",
+          "Clark", "Clarke", "Clay", "Clayton", "Clements", "Clemons",
+          "Cleveland", "Cline", "Cobb", "Cochran", "Coffey", "Cohen", "Cole",
+          "Coleman", "Collier", "Collins", "Colon", "Combs", "Compton",
+          "Conley", "Conner", "Conrad", "Contreras", "Conway", "Cook", "Cooke",
+          "Cooley", "Cooper", "Copeland", "Cortez", "Cote", "Cotton", "Cox",
+          "Craft", "Craig", "Crane", "Crawford", "Crosby", "Cross", "Cruz",
+          "Cummings", "Cunningham", "Curry", "Curtis", "Dale", "Dalton",
+          "Daniel", "Daniels", "Daugherty", "Davenport", "David", "Davidson",
+          "Dawson", "Day", "Dean", "Decker", "Dejesus", "Delacruz", "Delaney",
+          "Deleon", "Delgado", "Dennis", "Diaz", "Dickerson", "Dickinson",
+          "Dillard", "Dillon", "Dixon", "Dodson", "Dominguez", "Donaldson",
+          "Donovan", "Dorsey", "Dotson", "Douglas", "Downs", "Doyle", "Drake",
+          "Dudley", "Duffy", "Duke", "Duncan", "Dunlap", "Dunn", "Duran",
+          "Durham", "Dyer", "Eaton", "Edwards", "Elliott", "Ellis", "Ellison",
+          "Emerson", "England", "English", "Erickson", "Espinoza", "Estes",
+          "Estrada", "Evans", "Everett", "Ewing", "Farley", "Farmer",
+          "Farrell", "Faulkner", "Ferguson", "Fernandez", "Ferrell", "Fields",
+          "Figueroa", "Finch", "Finley", "Fischer", "Fisher", "Fitzgerald",
+          "Fitzpatrick", "Fleming", "Fletcher", "Flores", "Flowers", "Floyd",
+          "Flynn", "Foley", "Forbes", "Ford", "Foreman", "Foster", "Fowler",
+          "Fox", "Francis", "Franco", "Frank", "Franklin", "Franks", "Frazier",
+          "Frederick", "Freeman", "French", "Frost", "Fry", "Frye", "Fuentes",
+          "Fuller", "Fulton", "Gaines", "Gallagher", "Gallegos", "Galloway",
+          "Gamble", "Garcia", "Gardner", "Garner", "Garrett", "Garrison",
+          "Garza", "Gates", "Gay", "Gentry", "George", "Gibbs", "Gibson",
+          "Gilbert", "Giles", "Gill", "Gillespie", "Gilliam", "Gilmore",
+          "Glass", "Glenn", "Glover", "Goff", "Golden", "Gomez", "Gonzales",
+          "Gonzalez", "Good", "Goodman", "Goodwin", "Gordon", "Gould",
+          "Graham", "Grant", "Graves", "Gray", "Green", "Greene", "Greer",
+          "Gregory", "Griffin", "Griffith", "Grimes", "Gross", "Guerra",
+          "Guerrero", "Guthrie", "Gutierrez", "Guy", "Guzman", "Hahn", "Hale",
+          "Haley", "Hall", "Hamilton", "Hammond", "Hampton", "Hancock",
+          "Haney", "Hansen", "Hanson", "Hardin", "Harding", "Hardy", "Harmon",
+          "Harper", "Harris", "Harrington", "Harrison", "Hart", "Hartman",
+          "Harvey", "Hatfield", "Hawkins", "Hayden", "Hayes", "Haynes", "Hays",
+          "Head", "Heath", "Hebert", "Henderson", "Hendricks", "Hendrix",
+          "Henry", "Hensley", "Henson", "Herman", "Hernandez", "Herrera",
+          "Herring", "Hess", "Hester", "Hewitt", "Hickman", "Hicks", "Higgins",
+          "Hill", "Hines", "Hinton", "Hobbs", "Hodge", "Hodges", "Hoffman",
+          "Hogan", "Holcomb", "Holden", "Holder", "Holland", "Holloway",
+          "Holman", "Holmes", "Holt", "Hood", "Hooper", "Hoover", "Hopkins",
+          "Hopper", "Horn", "Horne", "Horton", "House", "Houston", "Howard",
+          "Howe", "Howell", "Hubbard", "Huber", "Hudson", "Huff", "Huffman",
+          "Hughes", "Hull", "Humphrey", "Hunt", "Hunter", "Hurley", "Hurst",
+          "Hutchinson", "Hyde", "Ingram", "Irwin", "Jacobs", "Jacobson",
+          "James", "Jarvis", "Jefferson", "Jenkins", "Jennings", "Jensen",
+          "Jimenez", "Johns", "Johnston", "Jordan", "Joseph", "Joyce",
+          "Joyner", "Juarez", "Justice", "Kane", "Kaufman", "Keith", "Keller",
+          "Kelley", "Kelly", "Kemp", "Kennedy", "Kent", "Kerr", "Key", "Kidd",
+          "Kim", "King", "Kinney", "Kirby", "Kirk", "Kirkland", "Klein",
+          "Kline", "Knapp", "Knight", "Knowles", "Knox", "Koch", "Kramer",
+          "Lamb", "Lambert", "Lancaster", "Landry", "Lane", "Lang", "Langley",
+          "Lara", "Larsen", "Larson", "Lawrence", "Lawson", "Le", "Leach",
+          "Leblanc", "Lee", "Leon", "Leonard", "Lester", "Levine", "Levy",
+          "Lewis", "Lindsay", "Lindsey", "Little", "Livingston", "Lloyd",
+          "Logan", "Long", "Lopez", "Lott", "Love", "Lowe", "Lowery", "Lucas",
+          "Luna", "Lynch", "Lynn", "Lyons", "Macdonald", "Macias", "Mack",
+          "Madden", "Maddox", "Maldonado", "Malone", "Mann", "Manning",
+          "Marks", "Marquez", "Marsh", "Marshall", "Martin", "Martinez",
+          "Mason", "Massey", "Mathews", "Mathis", "Matthews", "Maxwell", "May",
+          "Mayer", "Maynard", "Mayo", "Mays", "McBride", "McCall", "McCarthy",
+          "McCarty", "McClain", "McClure", "McConnell", "McCormick", "McCoy",
+          "McCray", "McCullough", "McDaniel", "McDonald", "McDowell",
+          "McFadden", "McFarland", "McGee", "McGowan", "McGuire", "McIntosh",
+          "McIntyre", "McKay", "McKee", "McKenzie", "McKinney", "McKnight",
+          "McLaughlin", "McLean", "McLeod", "McMahon", "McMillan", "McNeil",
+          "McPherson", "Meadows", "Medina", "Mejia", "Melendez", "Melton",
+          "Mendez", "Mendoza", "Mercado", "Mercer", "Merrill", "Merritt",
+          "Meyer", "Meyers", "Michael", "Middleton", "Miles", "Mills",
+          "Miranda", "Mitchell", "Molina", "Monroe", "Montgomery", "Montoya",
+          "Moody", "Moon", "Mooney", "Morales", "Moran", "Moreno", "Morgan",
+          "Morin", "Morris", "Morrison", "Morrow", "Morse", "Morton", "Moses",
+          "Mosley", "Moss", "Mueller", "Mullen", "Mullins", "Munoz", "Murphy",
+          "Murray", "Myers", "Nash", "Navarro", "Neal", "Nelson", "Newman",
+          "Newton", "Nguyen", "Nichols", "Nicholson", "Nielsen", "Nieves",
+          "Nixon", "Noble", "Noel", "Nolan", "Norman", "Norris", "Norton",
+          "Nunez", "Obrien", "Ochoa", "Oconnor", "Odom", "Odonnell", "Oliver",
+          "Olsen", "Olson", "O'neal", "O'neil", "O'neill", "Orr", "Ortega",
+          "Ortiz", "Osborn", "Osborne", "Owen", "Owens", "Pace", "Pacheco",
+          "Padilla", "Page", "Palmer", "Park", "Parker", "Parks", "Parrish",
+          "Parsons", "Pate", "Patel", "Patrick", "Patterson", "Patton", "Paul",
+          "Payne", "Pearson", "Peck", "Pena", "Pennington", "Perez", "Perkins",
+          "Perry", "Peters", "Petersen", "Peterson", "Petty", "Phelps",
+          "Phillips", "Pickett", "Pierce", "Pittman", "Pitts", "Pollard",
+          "Poole", "Pope", "Porter", "Potter", "Potts", "Powell", "Powers",
+          "Pratt", "Preston", "Price", "Prince", "Pruitt", "Puckett", "Pugh",
+          "Quinn", "Ramirez", "Ramos", "Ramsey", "Randall", "Randolph",
+          "Rasmussen", "Ratliff", "Ray", "Raymond", "Reed", "Reese", "Reeves",
+          "Reid", "Reilly", "Reyes", "Reynolds", "Rhodes", "Rice", "Rich",
+          "Richard", "Richards", "Richardson", "Richmond", "Riddle", "Riggs",
+          "Riley", "Rios", "Rivas", "Rivera", "Rivers", "Roach", "Robbins",
+          "Roberson", "Roberts", "Robertson", "Robinson", "Robles", "Rocha",
+          "Rodgers", "Rodriguez", "Rodriquez", "Rogers", "Rojas", "Rollins",
+          "Roman", "Romero", "Rosa", "Rosales", "Rosario", "Rose", "Ross",
+          "Roth", "Rowe", "Rowland", "Roy", "Ruiz", "Rush", "Russell", "Russo",
+          "Rutledge", "Ryan", "Salas", "Salazar", "Salinas", "Sampson",
+          "Sanchez", "Sanders", "Sandoval", "Sanford", "Santana", "Santiago",
+          "Santos", "Sargent", "Saunders", "Savage", "Sawyer", "Schmidt",
+          "Schneider", "Schroeder", "Schultz", "Schwartz", "Scott", "Sears",
+          "Sellers", "Serrano", "Sexton", "Shaffer", "Shannon", "Sharp",
+          "Sharpe", "Shaw", "Shelton", "Shepard", "Shepherd", "Sheppard",
+          "Sherman", "Shields", "Short", "Silva", "Simmons", "Simon",
+          "Simpson", "Sims", "Singleton", "Skinner", "Slater", "Sloan",
+          "Small", "Snider", "Snow", "Snyder", "Solis", "Solomon", "Sosa",
+          "Soto", "Sparks", "Spears", "Spence", "Spencer", "Stafford",
+          "Stanley", "Stanton", "Stark", "Steele", "Stein", "Stephens",
+          "Stephenson", "Stevens", "Stevenson", "Stewart", "Stokes", "Stone",
+          "Stout", "Strickland", "Strong", "Stuart", "Suarez", "Sullivan",
+          "Summers", "Sutton", "Swanson", "Sweeney", "Sweet", "Sykes",
+          "Talley", "Tanner", "Tate", "Terrell", "Terry", "Thompson",
+          "Thornton", "Tillman", "Todd", "Torres", "Townsend", "Tran",
+          "Travis", "Trevino", "Trujillo", "Tucker", "Turner", "Tyler",
+          "Tyson", "Underwood", "Valdez", "Valencia", "Valentine",
+          "Valenzuela", "Vance", "Vang", "Vargas", "Vasquez", "Vaughan",
+          "Vaughn", "Vazquez", "Vega", "Velasquez", "Velazquez", "Velez",
+          "Van halen", "Vincent", "Vinson", "Wade", "Wagner", "Walker", "Wall",
+          "Wallace", "Waller", "Walls", "Walsh", "Walter", "Walters", "Walton",
+          "Ward", "Ware", "Warner", "Warren", "Washington", "Waters",
+          "Watkins", "Watson", "Watts", "Weaver", "Webb", "Weber", "Webster",
+          "Weeks", "Weiss", "Welch", "Wells", "West", "Wheeler", "Whitaker",
+          "White", "Whitehead", "Whitfield", "Whitley", "Whitney", "Wiggins",
+          "Wilcox", "Wilder", "Wiley", "Wilkerson", "Wilkins", "Wilkinson",
+          "William", "Williamson", "Willis", "Winters", "Wise", "Witt", "Wolf",
+          "Wolfe", "Wong", "Wood", "Woodard", "Woods", "Woodward", "Wooten",
+          "Workman", "Wright", "Wyatt", "Wynn", "Yang", "Yates", "York",
+          "Young", "Zamora", "Zimmerman")
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala b/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala
new file mode 100644
index 0000000..9e70cca
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.util.Date
+import org.apache.bigtop.bigpetstore.generator.util.State
+import org.apache.commons.lang3.StringUtils
+import java.util.Arrays.asList
+import java.util.Random
+import scala.collection.Iterator
+import com.sun.org.apache.xml.internal.serializer.ToStream
+import java.util.{Iterator => JavaIterator}
+import scala.collection.JavaConversions.asJavaIterator
+import org.apache.bigtop.bigpetstore.generator.util.Product
+import org.apache.commons.lang3.Range;
+import org.apache.bigtop.bigpetstore.generator.util.ProductType
+
+/**
+ * This class generates our data. Over time we will use it to embed bias which
+ * can then be teased out, i.e. by clustering/classifiers. For example:
+ *
+ * certain products <--> certain years or days
+ */
+class TransactionIteratorFactory(private val records: Int,
+        private val customerIdRange: Range[java.lang.Long],
+        private val state: State) {
+  assert(records > 0, "Number of records must be greater than 0 to generate a data iterator!")
+  private val random = new Random(state.hashCode)
+
+  def data: JavaIterator[TransactionIteratorFactory.KeyVal[String, String]] = {
+    new TransactionIteratorFactory.DataIterator(records, customerIdRange, state, random)
+  }
+}
+
+object TransactionIteratorFactory {
+  class KeyVal[K, V](val key: K, val value: V)
+
+  private class DataIterator(records: Int,
+          customerIdRange: Range[java.lang.Long],
+          state: State,
+          r: Random) extends Iterator[KeyVal[String, String]] {
+    private var firstName: String = null
+    private var lastName: String = null
+    private var elementsProcducedCount = 0
+    private var repeatCount = 0
+    private var currentCustomerId = customerIdRange.getMinimum
+    private var currentProductType = selectRandomProductType;
+
+    def hasNext =
+      elementsProcducedCount < records && currentCustomerId <= customerIdRange.getMaximum
+
+
+    def next(): TransactionIteratorFactory.KeyVal[String,String] = {
+      val date = DataForger.randomDateInPastYears(50);
+      setIteratorState();
+
+      val product = randomProductOfCurrentlySelectedType
+      val key = StringUtils.join(asList("BigPetStore", "storeCode_" + state.name(),
+              elementsProcducedCount.toString), ",")
+      val value = StringUtils.join(asList(currentCustomerId, firstName, lastName, product.id,
+              product.name.toLowerCase, product.price, date), ",")
+
+      elementsProcducedCount += 1
+      new TransactionIteratorFactory.KeyVal(key, value)
+    }
+
+    private def setIteratorState() = {
+      /** Some customers come back for more :) We repeat a customer up to ten times */
+      if (repeatCount > 0) {
+        repeatCount -= 1
+      } else {
+        firstName = DataForger.firstName(r)
+        lastName = DataForger.lastName(r)
+        // this sometimes generates numbers much larger than 10. We don't really need Gaussian
+        // distribution since number of transactions per customer can be truly arbitrary.
+        repeatCount = (r.nextGaussian * 4f) toInt;
+        println("####Repeat: " + repeatCount)
+        currentCustomerId += 1
+        currentProductType = selectRandomProductType;
+      }
+    }
+
+    private def selectRandomProductType = {
+      ProductType.values.apply(r.nextInt(ProductType.values.length))
+    }
+
+    private def randomProductOfCurrentlySelectedType = {
+      currentProductType.getProducts.get(r.nextInt(currentProductType.getProducts.size))
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java b/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java
index 52b8079..e2f1f25 100644
--- a/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java
+++ b/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,7 +17,7 @@ package org.apache.bigtop.bigpetstore.generator;
 
 import static org.junit.Assert.assertFalse;
 
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE;
+import org.apache.bigtop.bigpetstore.generator.util.State;
 import org.apache.bigtop.bigpetstore.util.NumericalIdUtils;
 import org.junit.Test;
 
@@ -25,9 +25,9 @@ public class TestNumericalIdUtils {
 
     @Test
     public void testName() {
-        String strId= STATE.OK.name()+"_"+ "jay vyas";
+        String strId= State.OK.name()+"_"+ "jay vyas";
         long id = NumericalIdUtils.toId(strId);
-        String strId2= STATE.CO.name()+"_"+ "jay vyas";
+        String strId2= State.CO.name()+"_"+ "jay vyas";
         long id2 = NumericalIdUtils.toId(strId2);
         System.out.println(id + " " + id2);
         assertFalse(id==id2);

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java b/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java
index d68e36c..76de3d0 100755
--- a/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java
+++ b/bigtop-bigpetstore/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,7 +24,7 @@ import java.io.InputStreamReader;
 import java.util.Date;
 
 import org.apache.bigtop.bigpetstore.generator.BPSGenerator.props;
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE;
+import org.apache.bigtop.bigpetstore.generator.util.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -61,7 +61,7 @@ public class TestPetStoreTransactionGeneratorJob {
          * Run the job
          */
         Path output = new Path("petstoredata/" + (new Date()).toString());
-        Job createInput = BPSGenerator.createJob(output, c);
+        Job createInput = BPSGenerator.getCreateTransactionRecordsJob(output, c);
         createInput.submit();
         System.out.println(createInput);
         createInput.waitForCompletion(true);
@@ -83,10 +83,10 @@ public class TestPetStoreTransactionGeneratorJob {
             s = br.readLine();
             System.out.println("===>" + s);
             recordsSeen++;
-            if (s.contains(STATE.CT.name())) {
+            if (s.contains(State.CT.name())) {
                 CTseen = true;
             }
-            if (s.contains(STATE.AZ.name())) {
+            if (s.contains(State.AZ.name())) {
                 AZseen = true;
             }
         }


[2/2] git commit: BIGTOP-1272: Productionize the mahout recommender

Posted by ja...@apache.org.
BIGTOP-1272: Productionize the mahout recommender

Signed-off-by: jay@apache.org <jayunit100>


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/4fca4573
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/4fca4573
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/4fca4573

Branch: refs/heads/master
Commit: 4fca4573b388714e305cd365f4a9bed4f4c17e8c
Parents: e9771e6
Author: bhashit parikh <bh...@gmail.com>
Authored: Thu May 29 11:07:15 2014 +0530
Committer: jay@apache.org <jayunit100>
Committed: Tue Aug 19 23:09:35 2014 -0400

----------------------------------------------------------------------
 bigtop-bigpetstore/BPS_analytics.pig            |  10 +-
 bigtop-bigpetstore/README.md                    |  68 ++-
 bigtop-bigpetstore/arch.dot                     |  27 +-
 bigtop-bigpetstore/build.gradle                 | 327 +++++++------
 .../bigtop/bigpetstore/BigPetStoreMahoutIT.java |  73 +++
 .../bigtop/bigpetstore/BigPetStorePigIT.java    |  68 +--
 .../org/apache/bigtop/bigpetstore/ITUtils.java  |  92 ++--
 .../bigtop/bigpetstore/etl/PigCSVCleaner.java   |  71 ++-
 .../bigpetstore/generator/BPSGenerator.java     | 110 ++---
 .../generator/CustomerGenerator.scala           |  80 ++++
 ...GeneratePetStoreTransactionsInputFormat.java | 134 ------
 .../PetStoreTransactionInputSplit.java          |  28 +-
 .../PetStoreTransactionsInputFormat.java        | 139 ++++++
 .../generator/TransactionIteratorFactory.java   | 468 -------------------
 .../bigpetstore/generator/util/Product.java     |  63 +++
 .../bigpetstore/generator/util/ProductType.java |  29 ++
 .../bigpetstore/generator/util/State.java       |  26 ++
 .../bigpetstore/recommend/ItemRecommender.scala | 103 ++++
 .../bigpetstore/util/BigPetStoreConstants.java  |  17 +-
 .../bigpetstore/util/NumericalIdUtils.java      |  10 +-
 .../apache/bigtop/bigpetstore/util/Pair.java    | 125 -----
 .../bigpetstore/generator/DataForger.scala      | 263 +++++++++++
 .../generator/TransactionIteratorFactory.scala  | 104 +++++
 .../generator/TestNumericalIdUtils.java         |   8 +-
 .../TestPetStoreTransactionGeneratorJob.java    |  10 +-
 25 files changed, 1347 insertions(+), 1106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/BPS_analytics.pig
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/BPS_analytics.pig b/bigtop-bigpetstore/BPS_analytics.pig
index 44ed541..23e3749 100755
--- a/bigtop-bigpetstore/BPS_analytics.pig
+++ b/bigtop-bigpetstore/BPS_analytics.pig
@@ -38,14 +38,16 @@ csvdata =
           dump:chararray,
           state:chararray,
           transaction:int,
+          custId:long,
           fname:chararray,
           lname:chararray,
-          date:chararray,
+          productId:int,
+          product:chararray,
           price:float,
-          product:chararray);
+          date:chararray);
 
 -- RESULT:
--- (BigPetStore,storeCode_AK,1,jay,guy,Thu Dec 18 12:17:10 EST 1969,10.5,dog-food)
+-- (BigPetStore,storeCode_AK,1,11,jay,guy,3,dog-food,10.5,Thu Dec 18 12:17:10 EST 1969)
 -- ...
 
 -- Okay! Now lets group our data so we can do some stats.
@@ -55,7 +57,7 @@ csvdata =
 state_product = group csvdata by ( state, product ) ;
 
 -- RESULT
--- ((storeCode_AK,dog-food) , {(BigPetStore,storeCode_AK,1,jay,guy,Thu Dec 18 12:17:10 EST 1969,10.5,dog-food)}) --
+-- ((storeCode_AK,dog-food) , {(BigPetStore,storeCode_AK,1,11,jay,guy,3,dog-food,10.5,Thu Dec 18 12:17:10 EST 1969)}) --
 -- ...
 
 

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/README.md
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/README.md b/bigtop-bigpetstore/README.md
index 40a9088..e58d8f3 100644
--- a/bigtop-bigpetstore/README.md
+++ b/bigtop-bigpetstore/README.md
@@ -13,7 +13,7 @@ Architecture
 The application consists of the following modules
 
 * generator: generates raw data on the dfs
-* clustering: Apache Mahout demo code for processing the data using Item based Collaborative Filtering. This feature is not supported yet. You can track its progress using this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1272)
+* recommendations: Apache Mahout demo code for generating recommendations by anaylyzing the transaction records. This feature can be tracked at this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1272)
 * Pig: demo code for processing the data using Apache Pig
 * Hive: demo code for processing the data using Apache Hive. This part is not complete yet. We are working on it. You can track it using this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1270)
 * Crunch: demo code for processing the data using Apache Crunch
@@ -21,22 +21,22 @@ The application consists of the following modules
 Build Instructions
 ------------------
 
-You'll need to have [`gradle`](http://www.gradle.org/downloads) installed and set-up correctly in order to follow along these instructions.
+You'll need to have version 2.0 of  [`gradle`](http://www.gradle.org/downloads) installed and set-up correctly in order to follow along these instructions.
 We could have used the [`gradle-wrapper`](http://www.gradle.org/docs/current/userguide/gradle_wrapper.html) to avoid having to install `gradle`, but the `bigtop` project includes all `gradle*` directories in `.gitignore`. So, that's not going to work.
 
 ### Build the JAR
 
-  `gradle clean build` will build the bigpetstore `jar`. The `jar` will be located in the `build\libs` directory.
+`gradle clean build` will build the bigpetstore `jar`. The `jar` will be located in the `build\libs` directory.
 
 ### Run Intergration Tests With
   * Pig profile: `gradle clean integrationTest -P ITProfile=pig`
-  * Crunch profile: `gradle clean integrationTest -P ITProfile=crunch`
+  * Mahout Profile: `gradle clean integrationTest -P ITProfile=mahout`
+  * Crunch profile: Not Implemented Yet
   * Hive profile: Not implemented yet.
-  * Mahout profile: Not implemented yet.
 
 If you don't specify any profile-name, or if you specify an invalid-name for the `integrationTest` task, no integration tests will be run.
 
-*Note:* At this stage, only the `Pig` profile is working. Will continue to update this area as further work is completed.
+*Note:* At this stage, only the `Pig` and `Mahout` profiles are working. Will continue to update this area as further work is completed.
 
 For Eclipse Users
 -----------------
@@ -87,14 +87,61 @@ The next phase of the application processes the data to create basic aggregation
 
  - try it [on the gh-pages branch](http://jayunit100.github.io/bigpetstore/)
 
+
 Running on a hadoop cluster
 ---------------------------
 
-wget s3://bigpetstore/bigpetstore.jar
+*Note:* For running the code using the `hadoop jar` command instead of the `gradle` tasks, you will need to set the classpath appropriately. The discussion after [this comment][jira-mahout] in JIRA could also be useful apart from these instructions.
+
+### Build the fat-jar
+
+We are going to use a fat-jar in order to avoid specifying the entire classpath ourselves.
+
+The fat-jar is required when we are running the application on a hadoop cluster. The other way would be to specify all the dependencies (including the transitive ones) manually while running the hadoop job. Fat-jars make it easier to bundle almost all the dependencies inside the distribution jar itself.
+
+```
+gradle clean shadowJar -Pfor-cluster
+```
+
+This command will build the fat-jar with all the dependencies bundled in except the hadoop, mahout and pig dependencies, which we'll specify using `-libjars` option while running the hadoop job. These dependencies are excluded to avoid conflicts with the jars provided by hadoop itself.
+
+The generated jar will be inside the `build/libs` dir, with name like `BigPetStore-x.x.x-SNAPSHOT-all.jar`. For the remainig discussion I'll refer to this jar by `bps.jar`.
+
+### Get the mahout and pig jars
+
+You'll need both mahout and pig jars with the hadoop classes excluded. Commonly, you can find both of these in their respective distros. The required pig jar is generally named like `pig-x.x.x-withouthadoop.jar` and the mahout jar would be named like `mahout-core-job.jar`. If you want, you can build those yourself by following the instructions in [this JIRA comment][jira-mahout]]. For the remaining discussion, I am going to refer to these two jars by `pig-withouthadoop.jar` and `mahout-core-job.jar`.
+
+### Setup the classpath for hadoop nodes in the cluster
+
+```
+export JARS="/usr/lib/pig/pig-withouthadoop.jar,/usr/lib/mahout/mahout-core-job.jar"
+```
+
+We also need these jars to be present on the client side to kick-off the jobs. Reusing the `JARS` variable to put the same jars on the client classpath.
+
+```
+export HADOOP_CLASSPATH=`echo $JARS | sed s/,/:/g`
+```
+
+### Generate the data
+
+```
+hadoop jar bps.jar org.apache.bigtop.bigpetstore.generator.BPSGenerator 1000000 bigpetstore/gen
+```
+
+### Clean with pig
+
+```
+hadoop jar bps.jar org.apache.bigtop.bigpetstore.etl.PigCSVCleaner -libjars $JARS bigpetstore/gen/ bigpetstore/ custom_pigscript.pig
+```
+
+### Analyze and generate recommendations with mahout
+
+```
+hadoop jar bps.jar org.apache.bigtop.bigpetstore.recommend.ItemRecommender -libjars $JARS  bigpetstore/pig/Mahout bigpetstore/Mahout/AlsFactorization bigpetstore/Mahout/AlsRecommendations
+```
 
-hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.generator.BPSGenerator 1000000 bigpetstore/gen
 
-hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.etl.PigCSVCleaner bigpetstore/gen/ bigpetstore/pig/ custom_pigscript.pig
 ... (will add more steps as we add more phases to the workflow) ...
 
 
@@ -134,3 +181,6 @@ of EMR setup w/ a custom script).
 ...
 
 And so on.
+
+
+[jira-mahout]: https://issues.apache.org/jira/browse/BIGTOP-1272?focusedCommentId=14076023&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-1407602

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/arch.dot
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/arch.dot b/bigtop-bigpetstore/arch.dot
index 0f3f404..7d17c5a 100644
--- a/bigtop-bigpetstore/arch.dot
+++ b/bigtop-bigpetstore/arch.dot
@@ -6,7 +6,7 @@
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
-* 
+*
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,25 +18,24 @@ digraph bigpetstore {
    node [shape=record];
 
 
-   PROD_And_USER_HASH_FUNC [label="python or datafu udf" ,style="rounded,filled", shape=diamond];
+   BPSAnalytics [label="BPSAnalytics.pig" ,style="rounded, filled", shape=diamond];
    CUSTOMER_PAGE [label="CUSTOMER_PAGE|json|CUSTOMER_PAGE/part*"];
    DIRTY_CSV [label="DIRTY_CSV|fname   lname -prod , price ,prod,..|generated/part*"];
    CSV [label="CSV|fname,lname,prod,price,date,xcoord,ycoord,...|cleaned/part*"];
-   MAHOUT_VIEW_INPUT [label="MAHOUT_VIEW  |  (hashed name) 10001, (hashed purchases) 203 |  <hive_warehouse>/mahout_cf_in/part*" ];
-   MAHOUT_CF [label="MAHOUT collaborative filter output  | (hashed name) 10001, (hashed product) 201, .6 | mahout_cf_out/part*" ];
+   MAHOUT_VIEW_INPUT [label="MAHOUT_VIEW  |  (user-id) 10001  (product-id) 203  (implicit-rating) 1 |  cleaned/Mahout/part*" ];
+   MAHOUT_ALS [label="Parallel ALS Recommender output  | (user-id) 10001  [(product-id) 201: (recommendation-strength 0-1)0.546] | Mahout/AlsRecommendations/part*" ];
 
    Generate -> DIRTY_CSV [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.generator.BPSGenerator 100 bps/generated/"] ;
-   DIRTY_CSV -> pig [label=""];
+   DIRTY_CSV -> pig [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.etl.PigCSVCleaner bps/generated/ bps/cleaned/ "];
 
-   pig -> CSV [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.etl.PigCSVCleaner bps/generated/ bps/cleaned/ "];
-   CSV -> MAHOUT_VIEW_INPUT [label="BPS_Mahout_Viewbuilder.pig"]; 
-   PROD_And_USER_HASH_FUNC -> MAHOUT_VIEW_INPUT [label="used in BPS_MAHOUT_Viewbuilder.pig script"] ;
+   pig -> CSV [label="pig query to clean up generated transaction records"];
+   pig -> MAHOUT_VIEW_INPUT [label="pig query to produce mahout input format"];
 
-   MAHOUT_VIEW_INPUT -> mahout;
-   mahout -> MAHOUT_CF [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.analytics.BPSRecommender bps/mahout_cf_in/part* bps/mahout_cf_out/"];
+   MAHOUT_VIEW_INPUT -> ParallelALSFactorizationJob [label="hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.recommend.ItemRecommender cleaned/Mahout Mahout/AlsFactorization Mahout/AlsRecommendations"];
+   ParallelALSFactorizationJob -> "Mahout RecommenderJob"
+   "Mahout RecommenderJob" -> MAHOUT_ALS
 
-   CSV -> pig_job2;   
-   MAHOUT_CF  -> pig_job2 ;
-   PROD_And_USER_HASH_FUNC -> pig_job2;
-   pig_job2  -> CUSTOMER_PAGE [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.analytics.BPSRecommender bpg/cleaned/ bps/mahout_cf_out/"];
+   CSV -> BPSAnalytics;
+   BPSAnalytics  -> pig_job2;
+   pig_job2  -> CUSTOMER_PAGE [label=""];
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/build.gradle
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/build.gradle b/bigtop-bigpetstore/build.gradle
index efb69b3..a6a8c1a 100644
--- a/bigtop-bigpetstore/build.gradle
+++ b/bigtop-bigpetstore/build.gradle
@@ -3,6 +3,14 @@ apply plugin: "eclipse"
 // TODO add idea module config.
 apply plugin: "idea"
 apply plugin: "scala"
+apply plugin: 'com.github.johnrengelman.shadow'
+
+buildscript {
+  repositories { jcenter() }
+  dependencies {
+    classpath 'com.github.jengelman.gradle.plugins:shadow:1.0.2'
+  }
+}
 
 // Read the groupId and version properties from the "parent" bigtop project.
 // It would be better if there was some better way of doing this. Howvever,
@@ -10,9 +18,9 @@ apply plugin: "scala"
 // projects can't have maven projects as parents (AFAIK. If there is a way to do it,
 // it doesn't seem to be well-documented).
 def setProjectProperties() {
-	Node xml = new XmlParser().parse("../pom.xml")
-	group = xml.groupId.first().value().first()
-	version = xml.version.first().value().first()
+    Node xml = new XmlParser().parse("../pom.xml")
+    group = xml.groupId.first().value().first()
+    version = xml.version.first().value().first()
 }
 
 setProjectProperties()
@@ -27,40 +35,49 @@ targetCompatibility = 1.7
 
 // Specify any additional project properties.
 ext {
-	slf4jVersion = "1.7.5"
-	guavaVersion = "15.0"
-	hadoopVersion = "2.2.0"
-	datanucleusVersion = "3.2.2"
-	datanucleusJpaVersion = "3.2.1"
-	bonecpVersion = "0.8.0.RELEASE"
-	derbyVersion = "10.10.1.1"
+    slf4jVersion = "1.7.5"
+    guavaVersion = "15.0"
+    datanucleusVersion = "3.2.2"
+    datanucleusJpaVersion = "3.2.1"
+    bonecpVersion = "0.8.0.RELEASE"
+    derbyVersion = "10.10.1.1"
+
+    // from horton-works repo. They compile mahout-core against hadoop2.x. These
+    // mahout is compiled against 2.4.0
+    hadoopVersion = "2.4.0.2.1.2.0-402"
+    mahoutVersion = "0.9.0.2.1.2.0-402"
 }
 
 repositories {
-	mavenCentral()
+    mavenCentral()
+    maven {
+        url "http://repo.hortonworks.com/content/repositories/releases/"
+    }
 }
 
-tasks.withType(Compile) {
-	options.encoding = 'UTF-8'
-	options.compilerArgs << "-Xlint:all"
+tasks.withType(AbstractCompile) {
+    options.encoding = 'UTF-8'
+    options.compilerArgs << "-Xlint:all"
 }
 
 tasks.withType(ScalaCompile) {
-	// Enables incremental compilation.
-	// http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78
-	scalaCompileOptions.useAnt = false
+    // Enables incremental compilation.
+    // http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78
+    scalaCompileOptions.useAnt = false
 }
 
 tasks.withType(Test) {
-	testLogging {
-		// Uncomment this if you want to see the console output from the tests.
-		// showStandardStreams = true
-		events "passed", "skipped", "failed"
-	}
+    testLogging {
+        // Uncomment this if you want to see the console output from the tests.
+        // showStandardStreams = true
+        events "passed", "skipped", "failed"
+        // show standard out and standard error of the test JVM(s) on the console
+        //showStandardStreams = true
+    }
 }
 
 test {
-	exclude "**/*TestPig.java", "**/*TestHiveEmbedded.java", "**/*TestCrunch.java", "**/*TestPetStoreTransactionGeneratorJob.java"
+    exclude "**/*TestPig.java", "**/*TestHiveEmbedded.java", "**/*TestCrunch.java", "**/*TestPetStoreTransactionGeneratorJob.java"
 }
 
 // Create a separate source-set for the src/integrationTest set of classes. The convention here
@@ -68,19 +85,23 @@ test {
 // under the 'src' directory. So, in this case, it will look for a directory named 'src/integrationTest'
 // since the name of the source-set is 'integrationTest'
 sourceSets {
-	// The main and test source-sets are configured by both java and scala plugins. They contain
-	// all the src/main and src/test classes. The following statements make all of those classes
-	// available on the classpath for the integration-tests, for both java and scala.
-	integrationTest {
-		java {
-			compileClasspath += main.output + test.output
-			runtimeClasspath += main.output + test.output
-		}
-		scala {
-			compileClasspath += main.output + test.output
-			runtimeClasspath += main.output + test.output
-		}
-	}
+    main {
+        java.srcDirs = [];
+        scala.srcDirs = ["src/main/scala", "src/main/java"]
+    }
+    // The main and test source-sets are configured by both java and scala plugins. They contain
+    // all the src/main and src/test classes. The following statements make all of those classes
+    // available on the classpath for the integration-tests, for both java and scala.
+    integrationTest {
+        java {
+            compileClasspath += main.output + test.output
+            runtimeClasspath += main.output + test.output
+        }
+        scala {
+            compileClasspath += main.output + test.output
+            runtimeClasspath += main.output + test.output
+        }
+    }
 }
 
 // Creating a source-set automatically add a couple of corresponding configurations (when java/scala
@@ -91,120 +112,164 @@ sourceSets {
 // available for integrationTestRuntime. For ex. the testCompile configuration has a dependency on
 // jUnit and scalatest. This makes them available for the integration tests as well.
 configurations {
-	integrationTestCompile {
-		extendsFrom testCompile
-	}
+    integrationTestCompile {
+        extendsFrom testCompile
+    }
 
-	integrationTestRuntime {
-		extendsFrom integrationTestCompile, testRuntime
-	}
+    integrationTestRuntime {
+        extendsFrom integrationTestCompile, testRuntime
+    }
 }
 
 // To see the API that is being used here, consult the following docs
 // http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html
 def updateDependencyVersion(dependencyDetails, dependencyString) {
-	def parts = dependencyString.split(':')
-	def group = parts[0]
-	def name = parts[1]
-	def version = parts[2]
-	if (dependencyDetails.requested.group == group
-			&& dependencyDetails.requested.name == name) {
-		dependencyDetails.useVersion version
-	}
+    def parts = dependencyString.split(':')
+    def group = parts[0]
+    def name = parts[1]
+    def version = parts[2]
+    if (dependencyDetails.requested.group == group
+            && dependencyDetails.requested.name == name) {
+        dependencyDetails.useVersion version
+    }
 }
 
 def setupPigIntegrationTestDependencyVersions(dependencyResolveDetails) {
-	// This is the way we override the dependencies.
-	updateDependencyVersion dependencyResolveDetails, "joda-time:joda-time:2.2"
+    // This is the way we override the dependencies.
+    updateDependencyVersion dependencyResolveDetails, "joda-time:joda-time:2.2"
 }
 
 def setupCrunchIntegrationTestDependencyVersions(dependencyResolveDetails) {
-	// Specify any dependencies that you want to override for crunch integration tests.
+    // Specify any dependencies that you want to override for crunch integration tests.
+}
+
+def setupMahoutIntegrationTestDependencyVersions(dependencyResolveDetails) {
+    // Specify any dependencies that you want to override for mahout integration tests.
 }
 
+
 task integrationTest(type: Test, dependsOn: test) {
 
-	testClassesDir = sourceSets.integrationTest.output.classesDir
-	classpath = sourceSets.integrationTest.runtimeClasspath
-
-	if(!project.hasProperty('ITProfile')) {
-		// skip integration-tests if no profile has been specified.
-		integrationTest.onlyIf { false }
-		return;
-	}
-
-	def patternsToInclude
-	def dependencyConfigClosure
-	def skipDependencyUpdates = false
-	// Select the pattern for test classes that should be executed, and the dependency
-	// configuration function to be called based on the profile name specified at the command line.
-	switch (project.ITProfile) {
-		case "pig":
-			patternsToInclude = "*PigIT*"
-			dependencyConfigClosure = { setupPigIntegrationTestDependencyVersions(it) }
-			break
-		case "crunch":
-			patternsToInclude = "*CrunchIT*"
-			dependencyConfigClosure = { setupCrunchIntegrationTestDependencyVersions(it) }
-			break
-		// skip integration-tests if the passed in profile-name is not valid
-		default: integrationTest.onlyIf { false }; return
-	}
-
-
-	filter { includeTestsMatching patternsToInclude }
-
-	// This is the standard way gradle allows overriding each specific dependency.
-	// see: http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html
-	project.configurations.all {
-		resolutionStrategy {
-			eachDependency {
-				dependencyConfigClosure(it)
-			}
-		}
-	}
+    testClassesDir = sourceSets.integrationTest.output.classesDir
+    classpath = sourceSets.integrationTest.runtimeClasspath
+
+    if(!project.hasProperty('ITProfile')) {
+        // skip integration-tests if no profile has been specified.
+        integrationTest.onlyIf { false }
+        return;
+    }
+
+    def patternsToInclude
+    def dependencyConfigClosure
+    def skipDependencyUpdates = false
+    // Select the pattern for test classes that should be executed, and the dependency
+    // configuration function to be called based on the profile name specified at the command line.
+    switch (project.ITProfile) {
+        case "pig":
+            patternsToInclude = "*PigIT*"
+            dependencyConfigClosure = { setupPigIntegrationTestDependencyVersions(it) }
+            break
+        case "crunch":
+            patternsToInclude = "*CrunchIT*"
+            dependencyConfigClosure = { setupCrunchIntegrationTestDependencyVersions(it) }
+            break
+        case "mahout":
+            patternsToInclude = "*MahoutIT*"
+            dependencyConfigClosure = { setupMahoutIntegrationTestDependencyVersions(it) }
+            break
+        // skip integration-tests if the passed in profile-name is not valid
+        default: integrationTest.onlyIf { false }; return
+    }
+
+
+    filter { includeTestsMatching patternsToInclude }
+
+    // This is the standard way gradle allows overriding each specific dependency.
+    // see: http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html
+    project.configurations.all {
+        resolutionStrategy {
+            eachDependency {
+                dependencyConfigClosure(it)
+            }
+        }
+    }
 }
 
 dependencies {
-	compile "org.kohsuke:graphviz-api:1.0"
-	compile "org.apache.crunch:crunch-core:0.9.0-hadoop2"
-	compile "com.jolbox:bonecp:${project.bonecpVersion}"
-	compile "org.apache.derby:derby:${project.derbyVersion}"
-	compile "com.google.guava:guava:${project.guavaVersion}"
-	compile "commons-lang:commons-lang:2.6"
-	compile "joda-time:joda-time:2.3"
-	compile "org.apache.commons:commons-lang3:3.1"
-	compile "com.google.protobuf:protobuf-java:2.5.0"
-	compile "commons-logging:commons-logging:1.1.3"
-	compile "com.thoughtworks.xstream:xstream:+"
-	compile "org.apache.lucene:lucene-core:+"
-	compile "org.apache.lucene:lucene-analyzers-common:+"
-	compile "org.apache.solr:solr-commons-csv:3.5.0"
-	compile "org.apache.hadoop:hadoop-client:${project.hadoopVersion}"
-	compile group: "org.apache.pig", name: "pig", version: "0.12.0", classifier:"h2"
-	compile "org.slf4j:slf4j-api:${project.slf4jVersion}"
-	compile "log4j:log4j:1.2.12"
-	compile "org.slf4j:slf4j-log4j12:${project.slf4jVersion}"
-	compile "org.datanucleus:datanucleus-core:${project.datanucleusVersion}"
-	compile "org.datanucleus:datanucleus-rdbms:${project.datanucleusJpaVersion}"
-	compile "org.datanucleus:datanucleus-api-jdo:${project.datanucleusJpaVersion}"
-	compile "org.datanucleus:datanucleus-accessplatform-jdo-rdbms:${project.datanucleusJpaVersion}"
-	compile group: "org.apache.mrunit", name: "mrunit", version: "1.0.0", classifier:"hadoop2"
-
-	compile 'org.scala-lang:scala-library:2.10.0'
-
-	testCompile "junit:junit:4.11"
-	testCompile "org.hamcrest:hamcrest-all:1.3"
-	testCompile "org.scalatest:scalatest_2.10:2.1.7"
+    compile "org.kohsuke:graphviz-api:1.0"
+    compile "org.apache.crunch:crunch-core:0.9.0-hadoop2"
+    compile "com.jolbox:bonecp:${project.bonecpVersion}"
+    compile "org.apache.derby:derby:${project.derbyVersion}"
+    compile "com.google.guava:guava:${project.guavaVersion}"
+    compile "commons-lang:commons-lang:2.6"
+    compile "joda-time:joda-time:2.3"
+    compile "org.apache.commons:commons-lang3:3.1"
+    compile "com.google.protobuf:protobuf-java:2.5.0"
+    compile "commons-logging:commons-logging:1.1.3"
+    compile "com.thoughtworks.xstream:xstream:+"
+    compile "org.apache.lucene:lucene-core:+"
+    compile "org.apache.lucene:lucene-analyzers-common:+"
+    compile "org.apache.solr:solr-commons-csv:3.5.0"
+
+    compile group: "org.apache.pig", name: "pig", version: "0.12.0", classifier:"h2"
+    compile "org.slf4j:slf4j-api:${project.slf4jVersion}"
+    compile "log4j:log4j:1.2.12"
+    compile "org.slf4j:slf4j-log4j12:${project.slf4jVersion}"
+    compile "org.datanucleus:datanucleus-core:${project.datanucleusVersion}"
+    compile "org.datanucleus:datanucleus-rdbms:${project.datanucleusJpaVersion}"
+    compile "org.datanucleus:datanucleus-api-jdo:${project.datanucleusJpaVersion}"
+    compile "org.datanucleus:datanucleus-accessplatform-jdo-rdbms:${project.datanucleusJpaVersion}"
+    compile group: "org.apache.mrunit", name: "mrunit", version: "1.0.0", classifier:"hadoop2"
+
+    compile "org.jfairy:jfairy:0.2.4"
+
+    // from horton-works repo. They compile mahout-core against hadoop2.x
+    compile "org.apache.hadoop:hadoop-client:${hadoopVersion}"
+    compile "org.apache.mahout:mahout-core:${mahoutVersion}"
+
+    compile 'org.scala-lang:scala-library:2.11.0'
+
+    testCompile "junit:junit:4.11"
+    testCompile "org.hamcrest:hamcrest-all:1.3"
+    testCompile "org.scalatest:scalatest_2.11:2.1.7"
 }
 
-eclipse {
-	classpath {
-		// Add the sependencies and the src dirs for the integrationTest source-set to the
-		// .classpath file that will be generated by the eclipse plugin.
-		plusConfigurations += configurations.integrationTestCompile
-		// Uncomment the following two lines if you want to generate an eclipse project quickly.
-		downloadSources = false
-		downloadJavadoc = false
-	}
+configurations {
+    /* hadoopClusterRuntime */ runtime {
+	    // extendsFrom integrationTestRuntime
+	    if(project.hasProperty('for-cluster')) {
+		    excludeRules += [getGroup: { 'org.apache.crunch' }, getModule: { 'crunch-core' } ] as ExcludeRule
+		    excludeRules += [getGroup: { 'org.apache.pig' }, getModule: { 'pig' } ] as ExcludeRule
+		    excludeRules += [getGroup: { 'org.apache.mahout' }, getModule: { 'mahout-core' } ] as ExcludeRule
+		    excludeRules += [getGroup: { 'org.apache.hadoop' }, getModule: { 'hadoop-client' } ] as ExcludeRule
+		}
+    }
 }
+
+task listJars << {
+    configurations.shadow.each { println it.name }
+}
+
+def copyDependencyJarsForHadoopCluster() {
+    copy {
+        from configurations.hadoopClusterRuntime
+        into 'build/libs'
+    }
+}
+
+build {
+    doLast {
+        copyDependencyJarsForHadoopCluster()
+    }
+}
+
+eclipse {
+    classpath {
+        // Add the dependencies and the src dirs for the integrationTest source-set to the
+        // .classpath file that will be generated by the eclipse plugin.
+        plusConfigurations += [configurations.integrationTestCompile]
+        // Comment out the following two lines if you want to generate an eclipse project quickly.
+        downloadSources = true
+        downloadJavadoc = false
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java
new file mode 100644
index 0000000..b07c5a0
--- /dev/null
+++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore;
+
+import static org.apache.bigtop.bigpetstore.ITUtils.createTestOutputPath;
+import static org.apache.bigtop.bigpetstore.ITUtils.setup;
+
+import java.util.regex.Pattern;
+
+import org.apache.bigtop.bigpetstore.recommend.ItemRecommender;
+import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS.MahoutPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Predicate;
+
+public class BigPetStoreMahoutIT {
+
+  public static final Path INPUT_DIR_PATH =
+          new Path(ITUtils.BPS_TEST_PIG_CLEANED, MahoutPaths.Mahout.name());
+  public static final String INPUT_DIR_PATH_STR = INPUT_DIR_PATH.toString();
+  private static final Path MAHOUT_OUTPUT_DIR = createTestOutputPath(MahoutPaths.Mahout.name());
+  private static final Path ALS_FACTORIZATION_OUTPUT_DIR =
+          createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsFactorization.name());
+  private static final Path ALS_RECOMMENDATIONS_DIR =
+          createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsRecommendations.name());
+
+  private ItemRecommender itemRecommender;
+
+  @Before
+  public void setupTest() throws Throwable {
+    setup();
+    try {
+      FileSystem fs = FileSystem.get(new Configuration());
+      fs.delete(MAHOUT_OUTPUT_DIR, true);
+      itemRecommender = new ItemRecommender(INPUT_DIR_PATH_STR, ALS_FACTORIZATION_OUTPUT_DIR.toString(),
+              ALS_RECOMMENDATIONS_DIR.toString());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final Predicate<String> TEST_OUTPUT_FORMAT = new Predicate<String>() {
+    private final Pattern p = Pattern.compile("^\\d+\\s\\[\\d+:\\d+\\.\\d+\\]$");
+    @Override
+    public boolean apply(String input) {
+      return p.matcher(input).matches();
+    }
+  };
+
+  @Test
+  public void testPetStorePipeline() throws Exception {
+    itemRecommender.recommend();
+    ITUtils.assertOutput(ALS_RECOMMENDATIONS_DIR, TEST_OUTPUT_FORMAT);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
index 045a9cf..78d5c6b 100644
--- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
+++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java
@@ -19,26 +19,22 @@ import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_GENERATED;
 import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_PIG_CLEANED;
 import static org.apache.bigtop.bigpetstore.ITUtils.fs;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.InputStreamReader;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.bigtop.bigpetstore.etl.PigCSVCleaner;
 import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 
 /**
@@ -76,68 +72,24 @@ public class BigPetStorePigIT {
 			FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_CLEANED, true);
 			FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_COUNT_PRODUCTS, true);
 		} catch (Exception e) {
-			System.out.println("didnt need to delete pig output.");
-			// not necessarily an error
+			throw new RuntimeException(e);
 		}
 	}
 
-	static Map<Path, Function<String, Boolean>> TESTS = ImmutableMap.of(
+	static Map<Path, Predicate<String>> TESTS = ImmutableMap.of(
 		/** Test of the main output */
-		BPS_TEST_PIG_CLEANED, new Function<String, Boolean>() {
-			public Boolean apply(String x) {
-				// System.out.println("Verified...");
-				return true;
-			}
-		},
-		// Example of how to count products
-		// after doing basic pig data cleanup
-		BPS_TEST_PIG_COUNT_PRODUCTS, new Function<String, Boolean>() {
-			// Jeff'
-			public Boolean apply(String x) {
-				return true;
-			}
-		}
+		BPS_TEST_PIG_CLEANED, ITUtils.VERIFICATION_PERDICATE,
+		// Example of how to count products after doing basic pig data cleanup
+		BPS_TEST_PIG_COUNT_PRODUCTS, ITUtils.VERIFICATION_PERDICATE,
+		// Test the output that is to be used as an input for Mahout.
+		BigPetStoreMahoutIT.INPUT_DIR_PATH, ITUtils.VERIFICATION_PERDICATE
 	);
 
-	/**
-	 * The "core" task reformats data to TSV. lets test that first.
-	 */
 	@Test
 	public void testPetStoreCorePipeline() throws Exception {
 		runPig(BPS_TEST_GENERATED, BPS_TEST_PIG_CLEANED, PIG_SCRIPT);
-		for (Entry<Path, Function<String, Boolean>> e : TESTS.entrySet()) {
-			assertOutput(e.getKey(), e.getValue());
-		}
-	}
-
-	public static void assertOutput(Path base,
-			Function<String, Boolean> validator) throws Exception {
-		FileSystem fs = FileSystem.getLocal(new Configuration());
-
-		FileStatus[] files = fs.listStatus(base);
-		// print out all the files.
-		for (FileStatus stat : files) {
-			System.out.println(stat.getPath() + "  " + stat.getLen());
-		}
-
-		/**
-		 * Support map OR reduce outputs
-		 */
-		Path partm = new Path(base, "part-m-00000");
-		Path partr = new Path(base, "part-r-00000");
-		Path p = fs.exists(partm) ? partm : partr;
-
-		/**
-		 * Now we read through the file and validate its contents.
-		 */
-		BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
-
-		// line:{"product":"big chew toy","count":3}
-		while (r.ready()) {
-			String line = r.readLine();
-			log.info("line:" + line);
-			// System.out.println("line:"+line);
-			Assert.assertTrue("validationg line : " + line, validator.apply(line));
+		for (Entry<Path, Predicate<String>> e : TESTS.entrySet()) {
+			ITUtils.assertOutput(e.getKey(), e.getValue());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
index df3b948..fd53dc1 100644
--- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
+++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java
@@ -15,6 +15,8 @@
  */
 package org.apache.bigtop.bigpetstore;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
 import java.net.InetAddress;
 import java.nio.charset.Charset;
 import java.util.List;
@@ -22,15 +24,26 @@ import java.util.List;
 import org.apache.bigtop.bigpetstore.generator.BPSGenerator;
 import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
 import com.google.common.io.Files;
 
 public class ITUtils {
+  public static final Path TEST_OUTPUT_DIR = new Path("bps_integration_");
+
+  public static Predicate<String> VERIFICATION_PERDICATE = new Predicate<String>() {
+    @Override
+    public boolean apply(String input) {
+      return true;
+    }
+  };
 
 	static final Logger log = LoggerFactory.getLogger(ITUtils.class);
 
@@ -46,26 +59,25 @@ public class ITUtils {
 					msg += cp.replaceAll("hadoop", "**HADOOP**") + "\n";
 				}
 			}
-			throw new RuntimeException("Major error:  Probably issue.   " + "Check hadoop version?  " + e.getMessage()
-					+ " .... check these classpath elements:" + msg);
+			throw new RuntimeException("Major error:  Probably issue.   "
+			        + "Check hadoop version?  " + e.getMessage()
+			        + " .... check these classpath elements:" + msg);
 		}
 	}
-	public static final Path BPS_TEST_GENERATED = fs.makeQualified(new Path("bps_integration_",
-			BigPetStoreConstants.OUTPUTS.generated.name()));
-	public static final Path BPS_TEST_PIG_CLEANED = fs.makeQualified(new Path("bps_integration_",
-			BigPetStoreConstants.OUTPUTS.cleaned.name()));
-	public static final Path BPS_TEST_MAHOUT_IN = fs.makeQualified(new Path("bps_integration_",
-			BigPetStoreConstants.OUTPUTS.MAHOUT_CF_IN.name()));
-	public static final Path BPS_TEST_MAHOUT_OUT = fs.makeQualified(new Path("bps_integration_",
-			BigPetStoreConstants.OUTPUTS.MAHOUT_CF_OUT.name()));
-
-	public static void main(String[] args) {
 
+	public static final Path BPS_TEST_GENERATED =
+	        createTestOutputPath(BigPetStoreConstants.OUTPUTS.generated.name());
+	public static final Path BPS_TEST_PIG_CLEANED =
+	        createTestOutputPath (BigPetStoreConstants.OUTPUTS.cleaned.name());
+
+	public static Path createTestOutputPath(String... pathParts) {
+	  Path path = TEST_OUTPUT_DIR;
+	  for(String pathPart: pathParts) {
+	    path = new Path(path, pathPart);
+	  }
+	  return path;
 	}
 
-	// public static final Path CRUNCH_OUT = new
-	// Path("bps_integration_",BigPetStoreConstants.OUTPUT_3).makeQualified(fs);
-
 	/**
 	 * Some simple checks to make sure that unit tests in local FS. these arent
 	 * designed to be run against a distribtued system.
@@ -99,29 +111,18 @@ public class ITUtils {
 	 * test_data_directory/generated/part-r-00000
 	 */
 	public static void setup() throws Throwable {
-		int records = 10;
-		/**
-		 * Setup configuration with prop.
-		 */
 		Configuration conf = new Configuration();
 
-		// debugging for jeff and others in local fs
-		// that wont build
+		// debugging for Jeff and others in local fs that won't build
 		checkConf(conf);
 
-		conf.setInt(BPSGenerator.props.bigpetstore_records.name(), records);
+		conf.setInt(BPSGenerator.props.bigpetstore_records.name(), BPSGenerator.DEFAULT_NUM_RECORDS);
 
-		/**
-		 * Only create if doesnt exist already.....
-		 */
 		if (FileSystem.getLocal(conf).exists(BPS_TEST_GENERATED)) {
 			return;
 		}
 
-		/**
-		 * Create the data set.
-		 */
-		Job createInput = BPSGenerator.createJob(BPS_TEST_GENERATED, conf);
+		Job createInput = BPSGenerator.getCreateTransactionRecordsJob(BPS_TEST_GENERATED, conf);
 		createInput.waitForCompletion(true);
 
 		Path outputfile = new Path(BPS_TEST_GENERATED, "part-r-00000");
@@ -131,4 +132,37 @@ public class ITUtils {
 			System.out.println(l);
 		}
 	}
+
+
+	// A functions that logs the output file as a verification test
+	public static void assertOutput(Path base, Predicate<String> validator) throws Exception {
+	  FileSystem fs = FileSystem.getLocal(new Configuration());
+
+	  FileStatus[] files = fs.listStatus(base);
+	  // print out all the files.
+	  for (FileStatus stat : files) {
+	    System.out.println(stat.getPath() + "  " + stat.getLen());
+	  }
+
+	  /**
+	   * Support map OR reduce outputs
+	   */
+	  Path partm = new Path(base, "part-m-00000");
+	  Path partr = new Path(base, "part-r-00000");
+	  Path p = fs.exists(partm) ? partm : partr;
+
+	  /**
+	   * Now we read through the file and validate its contents.
+	   */
+	  BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p)));
+
+	  // line:{"product":"big chew toy","count":3}
+	  while (r.ready()) {
+	    String line = r.readLine();
+	    log.info("line:" + line);
+	    // System.out.println("line:"+line);
+	    Assert.assertTrue("validationg line : " + line, validator.apply(line));
+	  }
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
index 01ddd6e..0ca7444 100644
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,9 +21,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants;
+import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS;
 import org.apache.bigtop.bigpetstore.util.DeveloperTools;
-import org.apache.bigtop.bigpetstore.util.NumericalIdUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,7 +32,7 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 
 /**
- * This class operates by ETL'ing the dataset into pig.
+ * This class operates by ETL'ing the data-set into pig.
  * The pigServer is persisted through the life of the class, so that the
  * intermediate data sets created in the constructor can be reused.
  */
@@ -41,11 +40,12 @@ public class PigCSVCleaner  {
 
     PigServer pigServer;
 
+    private static Path getCleanedTsvPath(Path outputPath) {
+      return new Path(outputPath, OUTPUTS.tsv.name());
+    }
+
     public PigCSVCleaner(Path inputPath, Path outputPath, ExecType ex, File... scripts)
             throws Exception {
-
-
-
         FileSystem fs = FileSystem.get(inputPath.toUri(), new Configuration());
 
         if(! fs.exists(inputPath)){
@@ -61,36 +61,29 @@ public class PigCSVCleaner  {
         /**
          * First, split the tabs up.
          *
-         * BigPetStore,storeCode_OK,2 yang,jay,Mon Dec 15 23:33:49 EST
-         * 1969,69.56,flea collar
-         *
-         * ("BigPetStore,storeCode_OK,2",
-         * "yang,jay,Mon Dec 15 23:33:49 EST 1969,69.56,flea collar")
+         * BigPetStore,storeCode_OK,2 1,yang,jay,3,flea collar,69.56,Mon Dec 15 23:33:49 EST 1969
          *
-         * BigPetStore,storeCode_AK,1 amanda,fitzgerald,Sat Dec 20 09:44:25 EET
-         * 1969,7.5,cat-food
+         * ("BigPetStore,storeCode_OK,2", "1,yang,jay,3,flea collar,69.56,Mon Dec 15 23:33:49 EST 1969")
          */
-        pigServer.registerQuery("csvdata = LOAD '<i>' AS (ID,DETAILS);"
-                .replaceAll("<i>", inputPath.toString()));
+        pigServer.registerQuery("csvdata = LOAD '<i>' AS (ID,DETAILS);".replaceAll("<i>", inputPath.toString()));
 
+        // currentCustomerId, firstName, lastName, product.id, product.name.toLowerCase, product.price, date
         /**
-         * Now, we want to split the two tab delimited feidls into uniform
+         * Now, we want to split the two tab delimited fields into uniform
          * fields of comma separated values. To do this, we 1) Internally split
          * the FIRST and SECOND fields by commas "a,b,c" --> (a,b,c) 2) FLATTEN
          * the FIRST and SECOND fields. (d,e) (a,b,c) -> d e a b c
          */
-        pigServer
-                .registerQuery(
-                        "id_details = FOREACH csvdata GENERATE "
-                        + "FLATTEN" + "(STRSPLIT(ID,',',3)) AS " +
-                        		"(drop, code, transaction) ,"
-
-                        + "FLATTEN" + "(STRSPLIT(DETAILS,',',5)) AS " +
-                            "(lname, fname, date, price," +
-                            "product:chararray);");
-
-        pigServer.store("id_details", outputPath.toString());
-
+        pigServer.registerQuery(
+              "id_details = FOREACH csvdata GENERATE "
+              + "FLATTEN(STRSPLIT(ID, ',', 3)) AS " +
+			"(drop, code, transaction) ,"
+
+              + "FLATTEN(STRSPLIT(DETAILS, ',', 7)) AS " +
+                  "(custId, fname, lname, productId, product:chararray, price, date);");
+        pigServer.registerQuery("mahout_records = FOREACH id_details GENERATE custId, productId, 1;");
+        pigServer.store("id_details", getCleanedTsvPath(outputPath).toString());
+        pigServer.store("mahout_records", new Path(outputPath, OUTPUTS.MahoutPaths.Mahout.name()).toString());
         /**
          * Now we run scripts... this is where you can add some
          * arbitrary analytics.
@@ -102,18 +95,13 @@ public class PigCSVCleaner  {
          */
         int i = 0;
         for(File script : scripts) {
-            Map<String,String> parameters = new HashMap<String,String>();
-            parameters.put("input",
-                    outputPath.toString());
+            Map<String,String> parameters = new HashMap<>();
+            parameters.put("input", getCleanedTsvPath(outputPath).toString());
 
             Path dir = outputPath.getParent();
-            Path adHocOut=
-                    new Path(
-                            dir,
-                            BigPetStoreConstants.OUTPUTS.pig_ad_hoc_script.name()+(i++));
+            Path adHocOut = new Path(dir, OUTPUTS.pig_ad_hoc_script.name() + (i++));
             System.out.println("Setting default output to " + adHocOut);
             parameters.put("output", adHocOut.toString());
-
             pigServer.registerScript(script.getAbsolutePath(), parameters);
         }
     }
@@ -123,7 +111,7 @@ public class PigCSVCleaner  {
         for(int i = startIndex ; i < args.length ; i++) {
             File f = new File(args[i]);
             if(! f.exists()) {
-                throw new RuntimeException("Pig script arg " + i+ " " + f.getAbsolutePath() + " not found. ");
+                throw new RuntimeException("Pig script arg " + i + " " + f.getAbsolutePath() + " not found. ");
             }
             files.add(f);
         }
@@ -133,14 +121,11 @@ public class PigCSVCleaner  {
                 "Each one will be given $input and $output arguments.");
         return files.toArray(new File[]{});
     }
+
     public static void main(final String[] args) throws Exception {
         System.out.println("Starting pig etl " + args.length);
-
         Configuration c = new Configuration();
-        int res = ToolRunner.run(
-                c,
-
-                new Tool() {
+        int res = ToolRunner.run(c, new Tool() {
                     Configuration conf;
                     @Override
                     public void setConf(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
index 3319064..6c8beef 100755
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.apache.bigtop.bigpetstore.generator.PetStoreTransactionsInputFormat.props;
 
 /**
  * This is a mapreduce implementation of a generator of a large sentiment
@@ -46,71 +47,62 @@ import org.slf4j.LoggerFactory;
  */
 public class BPSGenerator {
 
-    final static Logger log = LoggerFactory.getLogger(BPSGenerator.class);
-
-    public enum props {
-        // bigpetstore_splits,
-        bigpetstore_records
-    }
+  public static final int DEFAULT_NUM_RECORDS = 100;
 
-    public static Job createJob(Path output, int records) throws IOException {
-        Configuration c = new Configuration();
-        c.setInt(props.bigpetstore_records.name(), 10);
-        return createJob(output, c);
-    }
+  final static Logger log = LoggerFactory.getLogger(BPSGenerator.class);
 
-    public static Job createJob(Path output, Configuration conf)
-            throws IOException {
-        Job job = new Job(conf, "PetStoreTransaction_ETL_"
-                + System.currentTimeMillis());
-        // recursively delete the data set if it exists.
-        FileSystem.get(output.toUri(),conf).delete(output, true);
-        job.setJarByClass(BPSGenerator.class);
-        job.setMapperClass(MyMapper.class);
-        // use the default reducer
-        // job.setReducerClass(PetStoreTransactionGeneratorJob.Red.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Text.class);
-        job.setInputFormatClass(GeneratePetStoreTransactionsInputFormat.class);
-        job.setOutputFormatClass(TextOutputFormat.class);
-        FileOutputFormat.setOutputPath(job, output);
-        return job;
-    }
+  public enum props {
+    bigpetstore_records
+  }
 
-    public static class MyMapper extends Mapper<Text, Text, Text, Text> {
+  public static Job createJob(Path output, int records) throws IOException {
+    Configuration c = new Configuration();
+    c.setInt(props.bigpetstore_records.name(), DEFAULT_NUM_RECORDS);
+    return getCreateTransactionRecordsJob(output, c);
+  }
 
-        @Override
-        protected void setup(Context context) throws IOException,
-                InterruptedException {
-            super.setup(context);
-        }
+  public static Job getCreateTransactionRecordsJob(Path outputDir, Configuration conf)
+          throws IOException {
+    Job job = new Job(conf, "PetStoreTransaction_ETL_" + System.currentTimeMillis());
+    // recursively delete the data set if it exists.
+    FileSystem.get(outputDir.toUri(), conf).delete(outputDir, true);
+    job.setJarByClass(BPSGenerator.class);
+    job.setMapperClass(MyMapper.class);
+    // use the default reducer
+    // job.setReducerClass(PetStoreTransactionGeneratorJob.Red.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setInputFormatClass(PetStoreTransactionsInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, outputDir);
+    return job;
+  }
 
-        protected void map(Text key, Text value, Context context)
-                throws java.io.IOException, InterruptedException {
-            context.write(key, value);
-            // TODO: Add multiple outputs here which writes mock addresses for
-            // generated users
-            // to a corresponding data file.
-        };
+  public static class MyMapper extends Mapper<Text, Text, Text, Text> {
+    @Override
+    protected void setup(Context context) throws IOException,
+    InterruptedException {
+      super.setup(context);
     }
 
-    public static void main(String args[]) throws Exception {
-        if (args.length != 2) {
-            System.err.println("USAGE : [number of records] [output path]");
-            System.exit(0);
-        } else {
-            Configuration conf = new Configuration();
-            DeveloperTools.validate(
-                    args,
-                    "# of records",
-                    "output path");
+    protected void map(Text key, Text value, Context context)
+            throws java.io.IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
 
-            conf.setInt(
-                    GeneratePetStoreTransactionsInputFormat.props.bigpetstore_records.name(),
-                    Integer.parseInt(args[0]));
-            createJob(new Path(args[1]), conf).waitForCompletion(true);
-        }
+  public static void main(String args[]) throws Exception {
+    if (args.length != 2) {
+      System.err.println("USAGE : [number of records] [output path]");
+      System.exit(0);
+    } else {
+      Configuration conf = new Configuration();
+      DeveloperTools.validate(args, "# of records", "output path");
+      conf.setInt(PetStoreTransactionsInputFormat.props.bigpetstore_records.name(),
+              Integer.parseInt(args[0]));
+      getCreateTransactionRecordsJob(new Path(args[1]), conf).waitForCompletion(true);
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
new file mode 100644
index 0000000..ef4ffb7
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
@@ -0,0 +1,80 @@
+package org.apache.bigtop.bigpetstore.generator
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.bigtop.bigpetstore.generator.util.State
+import org.apache.hadoop.fs.Path
+import parquet.org.codehaus.jackson.format.DataFormatDetector
+import org.slf4j.LoggerFactory
+import java.util.{Collection => JavaCollection}
+import scala.collection.JavaConversions.asJavaCollection
+import java.util.Random
+import scala.collection.mutable.{HashMap, Set, MultiMap}
+import scala.collection.immutable.NumericRange
+
+/**
+ * This class generates random customer data. The generated customer
+ * ids will be consecutive. The client code that generates the transactions
+ * records needs to know the available customer ids. If we keep the customer
+ * ids consecutive here. we don't have to store those ids in memory, or perform
+ * costly lookups. Once we introduce something that allows efficient lookup
+ * of data, we can do something else as well.
+ *
+ * The generated customer ids will start from 1. So, if we have 100 customers,
+ * the ids will be [1, 100].
+ */
+class CustomerGenerator(val desiredCustomerCount: Int, val outputPath: Path) {
+  private val logger = LoggerFactory.getLogger(getClass)
+  private val random = new Random;
+  private val assertion = "The generateCustomerRecords() hasn't been called yet";
+  private var customerFileGenerated = false
+  private val _stateToCustomerIds = new HashMap[State, NumericRange[Long]]
+
+  def isCustomerFileGenrated = customerFileGenerated
+
+  def customerIds(state: State) = {
+    assert(customerFileGenerated, assertion)
+    _stateToCustomerIds(state)
+  }
+
+  def generateCustomerRecords() = {
+    val config = new Configuration
+    val fs = FileSystem.getLocal(config)
+
+    assert(!fs.exists(outputPath))
+
+    val outputStream = fs.create(outputPath)
+
+    var currentId: Long = 1
+    logger.info("Generating customer records at: {}", fs.pathToFile(outputPath))
+    for (state <- State.values();
+            stateCustomerCount = (state.probability * desiredCustomerCount) toLong;
+            random = new Random(state.hashCode);
+            i <- 1L to stateCustomerCount) {
+      val customerRecord = CustomerGenerator.createRecord(currentId, state, random);
+      logger.info("generated customer: {}", customerRecord)
+      outputStream.writeBytes(customerRecord)
+
+      if(i == 1) {
+        val stateCustomerIdRange = currentId until (currentId + stateCustomerCount);
+        _stateToCustomerIds += (state -> stateCustomerIdRange)
+      }
+      currentId += 1
+    }
+
+    println(_stateToCustomerIds)
+    outputStream.flush
+    outputStream.close
+    customerFileGenerated = true
+  }
+}
+
+object CustomerGenerator {
+  val OUTPUT_FILE_NAME = "customers"
+
+  private def createRecord(id: Long, state: State, r: Random) = {
+    val firstName = DataForger.firstName
+    val lastName = DataForger.lastName
+    s"$id\t${DataForger.firstName(r)}\t${DataForger.lastName(r)}\t${state.name}\n"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java
deleted file mode 100755
index a779428..0000000
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bigtop.bigpetstore.generator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.KeyVal;
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * A simple input split that fakes input.
- */
-public class GeneratePetStoreTransactionsInputFormat extends
-        FileInputFormat<Text, Text> {
-
-    @Override
-    public RecordReader<Text, Text> createRecordReader(
-            final InputSplit inputSplit, TaskAttemptContext arg1)
-            throws IOException, InterruptedException {
-        return new RecordReader<Text, Text>() {
-
-            @Override
-            public void close() throws IOException {
-
-            }
-
-            /**
-             * We need the "state" information to generate records. - Each state
-             * has a probability associated with it, so that our data set can be
-             * realistic (i.e. Colorado should have more transactions than rhode
-             * island).
-             *
-             * - Each state also will its name as part of the key.
-             *
-             * - This task would be distributed, for example, into 50 nodes on a
-             * real cluster, each creating the data for a given state.
-             */
-
-            // String storeCode = ((Split) inputSplit).storeCode;
-            int records = ((PetStoreTransactionInputSplit) inputSplit).records;
-            Iterator<KeyVal<String, String>> data = (new TransactionIteratorFactory(
-                    records, ((PetStoreTransactionInputSplit) inputSplit).state))
-                    .getData();
-            KeyVal<String, String> currentRecord;
-
-            @Override
-            public Text getCurrentKey() throws IOException,
-                    InterruptedException {
-                return new Text(currentRecord.key);
-            }
-
-            @Override
-            public Text getCurrentValue() throws IOException,
-                    InterruptedException {
-                return new Text(currentRecord.val);
-            }
-
-            @Override
-            public void initialize(InputSplit arg0, TaskAttemptContext arg1)
-                    throws IOException, InterruptedException {
-            }
-
-            @Override
-            public boolean nextKeyValue() throws IOException,
-                    InterruptedException {
-                if (data.hasNext()) {
-                    currentRecord = data.next();
-                    return true;
-                }
-                return false;
-            }
-
-            @Override
-            public float getProgress() throws IOException, InterruptedException {
-                return 0f;
-            }
-
-        };
-    }
-
-    public enum props {
-        // bigpetstore_splits,
-        bigpetstore_records
-    }
-
-    @Override
-    public List<InputSplit> getSplits(JobContext arg) throws IOException {
-        int num_records_desired = arg
-                .getConfiguration()
-                .getInt(GeneratePetStoreTransactionsInputFormat.props.bigpetstore_records
-                        .name(), -1);
-        if (num_records_desired == -1) {
-            throw new RuntimeException(
-                    "# of total records not set in configuration object: "
-                            + arg.getConfiguration());
-        }
-
-        ArrayList<InputSplit> list = new ArrayList<InputSplit>();
-
-        /**
-         * Generator class will take a state as input and generate all the data
-         * for that state.
-         */
-        for (TransactionIteratorFactory.STATE s : STATE.values()) {
-            PetStoreTransactionInputSplit split = new PetStoreTransactionInputSplit(
-                    (int) (Math.ceil(num_records_desired * s.probability)), s);
-            System.out.println(s + " _ " + split.records);
-            list.add(split);
-        }
-        return list;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
index 9b32344..d350cc8 100755
--- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,7 +19,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE;
+import org.apache.bigtop.bigpetstore.generator.util.State;
+import org.apache.commons.lang3.Range;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 
@@ -38,21 +39,26 @@ public class PetStoreTransactionInputSplit extends InputSplit implements
     }
 
     public int records;
-    public STATE state;
+    public State state;
+    public Range<Long> customerIdRange;
 
-    public PetStoreTransactionInputSplit(int records, STATE state) {
+    public PetStoreTransactionInputSplit(int records, Range<Long> customerIdRange, State state) {
         this.records = records;
         this.state = state;
+        this.customerIdRange = customerIdRange;
     }
 
-    public void readFields(DataInput arg0) throws IOException {
-        records = arg0.readInt();
-        state = STATE.valueOf(arg0.readUTF());
+    public void readFields(DataInput dataInputStream) throws IOException {
+        records = dataInputStream.readInt();
+        state = State.valueOf(dataInputStream.readUTF());
+        customerIdRange = Range.between(dataInputStream.readLong(), dataInputStream.readLong());
     }
 
-    public void write(DataOutput arg0) throws IOException {
-        arg0.writeInt(records);
-        arg0.writeUTF(state.name());
+    public void write(DataOutput dataOutputStream) throws IOException {
+        dataOutputStream.writeInt(records);
+        dataOutputStream.writeUTF(state.name());
+        dataOutputStream.writeLong(customerIdRange.getMinimum());
+        dataOutputStream.writeLong(customerIdRange.getMaximum());
     }
 
     @Override
@@ -62,6 +68,6 @@ public class PetStoreTransactionInputSplit extends InputSplit implements
 
     @Override
     public long getLength() throws IOException, InterruptedException {
-        return 100;
+        return records;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
new file mode 100755
index 0000000..4c22e36
--- /dev/null
+++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.KeyVal;
+import org.apache.bigtop.bigpetstore.generator.util.State;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * A simple input split that fakes input.
+ */
+public class PetStoreTransactionsInputFormat extends
+    FileInputFormat<Text, Text> {
+
+  @Override
+  public RecordReader<Text, Text> createRecordReader(
+          final InputSplit inputSplit, TaskAttemptContext arg1)
+                  throws IOException, InterruptedException {
+    return new RecordReader<Text, Text>() {
+
+      @Override
+      public void close() throws IOException {
+
+      }
+
+      /**
+       * We need the "state" information to generate records. - Each state
+       * has a probability associated with it, so that our data set can be
+       * realistic (i.e. Colorado should have more transactions than rhode
+       * island).
+       *
+       * - Each state also will its name as part of the key.
+       *
+       * - This task would be distributed, for example, into 50 nodes on a
+       * real cluster, each creating the data for a given state.
+       */
+
+      PetStoreTransactionInputSplit bpsInputplit = (PetStoreTransactionInputSplit) inputSplit;
+      int records = bpsInputplit.records;
+      // TODO why not send the whole InputSplit there?
+      Iterator<KeyVal<String, String>> data =
+              (new TransactionIteratorFactory(records, bpsInputplit.customerIdRange, bpsInputplit.state)).data();
+      KeyVal<String, String> currentRecord;
+
+      @Override
+      public Text getCurrentKey() throws IOException,
+      InterruptedException {
+        return new Text(currentRecord.key());
+      }
+
+      @Override
+      public Text getCurrentValue() throws IOException,
+      InterruptedException {
+        return new Text(currentRecord.value());
+      }
+
+      @Override
+      public void initialize(InputSplit arg0, TaskAttemptContext arg1)
+              throws IOException, InterruptedException {
+      }
+
+      @Override
+      public boolean nextKeyValue() throws IOException,
+      InterruptedException {
+        if (data.hasNext()) {
+          currentRecord = data.next();
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return 0f;
+      }
+
+    };
+  }
+
+  public enum props {
+    bigpetstore_records
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext arg) throws IOException {
+    int numRecordsDesired = arg
+            .getConfiguration()
+            .getInt(PetStoreTransactionsInputFormat.props.bigpetstore_records
+                    .name(), -1);
+    if (numRecordsDesired == -1) {
+      throw new RuntimeException(
+              "# of total records not set in configuration object: "
+                      + arg.getConfiguration());
+    }
+
+    List<InputSplit> list = new ArrayList<InputSplit>();
+    long customerIdStart = 1;
+    for (State s : State.values()) {
+      int numRecords = numRecords(numRecordsDesired, s.probability);
+      // each state is assigned a range of customer-ids from which it can choose.
+      // The number of customers can be as many as the number of transactions.
+      Range<Long> customerIdRange = Range.between(customerIdStart, customerIdStart + numRecords - 1);
+      PetStoreTransactionInputSplit split =
+              new PetStoreTransactionInputSplit(numRecords, customerIdRange, s);
+      System.out.println(s + " _ " + split.records);
+      list.add(split);
+      customerIdStart += numRecords;
+    }
+    return list;
+  }
+
+  private int numRecords(int numRecordsDesired, float probability) {
+    return (int) (Math.ceil(numRecordsDesired * probability));
+  }
+}
\ No newline at end of file