You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/28 14:55:00 UTC

[32/51] [partial] mahout git commit: NO-JIRA Clean up MR refactor

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java
new file mode 100644
index 0000000..bd1149b
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.tools;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.streaming.mapreduce.CentroidWritable;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class IOUtils {
+
+  private IOUtils() {}
+
+  /**
+   * Converts CentroidWritable values in a sequence file into Centroids lazily.
+   * @param dirIterable the source iterable (comes from a SequenceFileDirIterable).
+   * @return an Iterable<Centroid> with the converted vectors.
+   */
+  public static Iterable<Centroid> getCentroidsFromCentroidWritableIterable(
+      Iterable<CentroidWritable>  dirIterable) {
+    return Iterables.transform(dirIterable, new Function<CentroidWritable, Centroid>() {
+      @Override
+      public Centroid apply(CentroidWritable input) {
+        Preconditions.checkNotNull(input);
+        return input.getCentroid().clone();
+      }
+    });
+  }
+
+  /**
+   * Converts CentroidWritable values in a sequence file into Centroids lazily.
+   * @param dirIterable the source iterable (comes from a SequenceFileDirIterable).
+   * @return an Iterable<Centroid> with the converted vectors.
+   */
+  public static Iterable<Centroid> getCentroidsFromClusterWritableIterable(Iterable<ClusterWritable>  dirIterable) {
+    return Iterables.transform(dirIterable, new Function<ClusterWritable, Centroid>() {
+      int numClusters = 0;
+      @Override
+      public Centroid apply(ClusterWritable input) {
+        Preconditions.checkNotNull(input);
+        return new Centroid(numClusters++, input.getValue().getCenter().clone(),
+            input.getValue().getTotalObservations());
+      }
+    });
+  }
+
+  /**
+   * Converts VectorWritable values in a sequence file into Vectors lazily.
+   * @param dirIterable the source iterable (comes from a SequenceFileDirIterable).
+   * @return an Iterable<Vector> with the converted vectors.
+   */
+  public static Iterable<Vector> getVectorsFromVectorWritableIterable(Iterable<VectorWritable> dirIterable) {
+    return Iterables.transform(dirIterable, new Function<VectorWritable, Vector>() {
+      @Override
+      public Vector apply(VectorWritable input) {
+        Preconditions.checkNotNull(input);
+        return input.get().clone();
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
new file mode 100644
index 0000000..083cd8c
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.syntheticcontrol.canopy;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.conversion.InputDriver;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.utils.clustering.ClusterDumper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+public final class Job extends AbstractJob {
+
+  private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
+
+  private Job() {
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(Job.class);
+
+  public static void main(String[] args) throws Exception {
+    if (args.length > 0) {
+      log.info("Running with only user-supplied arguments");
+      ToolRunner.run(new Configuration(), new Job(), args);
+    } else {
+      log.info("Running with default arguments");
+      Path output = new Path("output");
+      HadoopUtil.delete(new Configuration(), output);
+      run(new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55);
+    }
+  }
+
+  /**
+   * Run the canopy clustering job on an input dataset using the given distance
+   * measure, t1 and t2 parameters. All output data will be written to the
+   * output directory, which will be initially deleted if it exists. The
+   * clustered points will reside in the path <output>/clustered-points. By
+   * default, the job expects the a file containing synthetic_control.data as
+   * obtained from
+   * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series
+   * resides in a directory named "testdata", and writes output to a directory
+   * named "output".
+   * 
+   * @param input
+   *          the String denoting the input directory path
+   * @param output
+   *          the String denoting the output directory path
+   * @param measure
+   *          the DistanceMeasure to use
+   * @param t1
+   *          the canopy T1 threshold
+   * @param t2
+   *          the canopy T2 threshold
+   */
+  private static void run(Path input, Path output, DistanceMeasure measure,
+      double t1, double t2) throws Exception {
+    Path directoryContainingConvertedInput = new Path(output,
+        DIRECTORY_CONTAINING_CONVERTED_INPUT);
+    InputDriver.runJob(input, directoryContainingConvertedInput,
+        "org.apache.mahout.math.RandomAccessSparseVector");
+    CanopyDriver.run(new Configuration(), directoryContainingConvertedInput,
+        output, measure, t1, t2, true, 0.0, false);
+    // run ClusterDumper
+    ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
+        "clusters-0-final"), new Path(output, "clusteredPoints"));
+    clusterDumper.printClusters(null);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.t1Option().create());
+    addOption(DefaultOptionCreator.t2Option().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+
+    Map<String, List<String>> argMap = parseArguments(args);
+    if (argMap == null) {
+      return -1;
+    }
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(new Configuration(), output);
+    }
+    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+    double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+
+    run(input, output, measure, t1, t2);
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
new file mode 100644
index 0000000..43beb78
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.syntheticcontrol.fuzzykmeans;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.conversion.InputDriver;
+import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.utils.clustering.ClusterDumper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Job extends AbstractJob {
+  
+  private static final Logger log = LoggerFactory.getLogger(Job.class);
+  
+  private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
+  
+  private static final String M_OPTION = FuzzyKMeansDriver.M_OPTION;
+  
+  private Job() {
+  }
+  
+  public static void main(String[] args) throws Exception {
+    if (args.length > 0) {
+      log.info("Running with only user-supplied arguments");
+      ToolRunner.run(new Configuration(), new Job(), args);
+    } else {
+      log.info("Running with default arguments");
+      Path output = new Path("output");
+      Configuration conf = new Configuration();
+      HadoopUtil.delete(conf, output);
+      run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55, 10, 2.0f, 0.5);
+    }
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.convergenceOption().create());
+    addOption(DefaultOptionCreator.maxIterationsOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(DefaultOptionCreator.t1Option().create());
+    addOption(DefaultOptionCreator.t2Option().create());
+    addOption(M_OPTION, M_OPTION, "coefficient normalization factor, must be greater than 1", true);
+    
+    Map<String,List<String>> argMap = parseArguments(args);
+    if (argMap == null) {
+      return -1;
+    }
+    
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    if (measureClass == null) {
+      measureClass = SquaredEuclideanDistanceMeasure.class.getName();
+    }
+    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+    float fuzziness = Float.parseFloat(getOption(M_OPTION));
+    
+    addOption(new DefaultOptionBuilder().withLongName(M_OPTION).withRequired(true)
+        .withArgument(new ArgumentBuilder().withName(M_OPTION).withMinimum(1).withMaximum(1).create())
+        .withDescription("coefficient normalization factor, must be greater than 1").withShortName(M_OPTION).create());
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+    double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+    double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+    run(getConf(), input, output, measure, t1, t2, maxIterations, fuzziness, convergenceDelta);
+    return 0;
+  }
+  
+  /**
+   * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+   * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+   * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+   * containing synthetic_control.data as obtained from
+   * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+   * and writes output to a directory named "output".
+   * 
+   * @param input
+   *          the String denoting the input directory path
+   * @param output
+   *          the String denoting the output directory path
+   * @param t1
+   *          the canopy T1 threshold
+   * @param t2
+   *          the canopy T2 threshold
+   * @param maxIterations
+   *          the int maximum number of iterations
+   * @param fuzziness
+   *          the float "m" fuzziness coefficient
+   * @param convergenceDelta
+   *          the double convergence criteria for iterations
+   */
+  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+      int maxIterations, float fuzziness, double convergenceDelta) throws Exception {
+    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
+    log.info("Preparing Input");
+    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
+    log.info("Running Canopy to get initial clusters");
+    Path canopyOutput = new Path(output, "canopies");
+    CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0, false);
+    log.info("Running FuzzyKMeans");
+    FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(canopyOutput, "clusters-0-final"), output,
+        convergenceDelta, maxIterations, fuzziness, true, true, 0.0, false);
+    // run ClusterDumper
+    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output, "clusteredPoints"));
+    clusterDumper.printClusters(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
new file mode 100644
index 0000000..70c41fe
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.syntheticcontrol.kmeans;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.conversion.InputDriver;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.utils.clustering.ClusterDumper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Job extends AbstractJob {
+  
+  private static final Logger log = LoggerFactory.getLogger(Job.class);
+  
+  private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
+  
+  private Job() {
+  }
+  
+  public static void main(String[] args) throws Exception {
+    if (args.length > 0) {
+      log.info("Running with only user-supplied arguments");
+      ToolRunner.run(new Configuration(), new Job(), args);
+    } else {
+      log.info("Running with default arguments");
+      Path output = new Path("output");
+      Configuration conf = new Configuration();
+      HadoopUtil.delete(conf, output);
+      run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 6, 0.5, 10);
+    }
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.numClustersOption().create());
+    addOption(DefaultOptionCreator.t1Option().create());
+    addOption(DefaultOptionCreator.t2Option().create());
+    addOption(DefaultOptionCreator.convergenceOption().create());
+    addOption(DefaultOptionCreator.maxIterationsOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    
+    Map<String,List<String>> argMap = parseArguments(args);
+    if (argMap == null) {
+      return -1;
+    }
+    
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    if (measureClass == null) {
+      measureClass = SquaredEuclideanDistanceMeasure.class.getName();
+    }
+    double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+    int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+    if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
+      int k = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
+      run(getConf(), input, output, measure, k, convergenceDelta, maxIterations);
+    } else {
+      double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+      double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+      run(getConf(), input, output, measure, t1, t2, convergenceDelta, maxIterations);
+    }
+    return 0;
+  }
+  
+  /**
+   * Run the kmeans clustering job on an input dataset using the given the number of clusters k and iteration
+   * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+   * The clustered points will reside in the path <output>/clustered-points. By default, the job expects a file
+   * containing equal length space delimited data that resides in a directory named "testdata", and writes output to a
+   * directory named "output".
+   * 
+   * @param conf
+   *          the Configuration to use
+   * @param input
+   *          the String denoting the input directory path
+   * @param output
+   *          the String denoting the output directory path
+   * @param measure
+   *          the DistanceMeasure to use
+   * @param k
+   *          the number of clusters in Kmeans
+   * @param convergenceDelta
+   *          the double convergence criteria for iterations
+   * @param maxIterations
+   *          the int maximum number of iterations
+   */
+  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
+      double convergenceDelta, int maxIterations) throws Exception {
+    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
+    log.info("Preparing Input");
+    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
+    log.info("Running random seed to get initial clusters");
+    Path clusters = new Path(output, "random-seeds");
+    clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
+    log.info("Running KMeans with k = {}", k);
+    KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, convergenceDelta,
+        maxIterations, true, 0.0, false);
+    // run ClusterDumper
+    Path outGlob = new Path(output, "clusters-*-final");
+    Path clusteredPoints = new Path(output,"clusteredPoints");
+    log.info("Dumping out clusters from clusters: {} and clusteredPoints: {}", outGlob, clusteredPoints);
+    ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
+    clusterDumper.printClusters(null);
+  }
+  
+  /**
+   * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+   * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+   * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+   * containing synthetic_control.data as obtained from
+   * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+   * and writes output to a directory named "output".
+   * 
+   * @param conf
+   *          the Configuration to use
+   * @param input
+   *          the String denoting the input directory path
+   * @param output
+   *          the String denoting the output directory path
+   * @param measure
+   *          the DistanceMeasure to use
+   * @param t1
+   *          the canopy T1 threshold
+   * @param t2
+   *          the canopy T2 threshold
+   * @param convergenceDelta
+   *          the double convergence criteria for iterations
+   * @param maxIterations
+   *          the int maximum number of iterations
+   */
+  public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+      double convergenceDelta, int maxIterations) throws Exception {
+    Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
+    log.info("Preparing Input");
+    InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
+    log.info("Running Canopy to get initial clusters");
+    Path canopyOutput = new Path(output, "canopies");
+    CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0,
+        false);
+    log.info("Running KMeans");
+    KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR
+        + "-final"), output, convergenceDelta, maxIterations, true, 0.0, false);
+    // run ClusterDumper
+    ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+        "clusteredPoints"));
+    clusterDumper.printClusters(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java
new file mode 100644
index 0000000..92363e5
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.IOException;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.fpm.pfpgrowth.dataset.KeyBasedStringTupleGrouper;
+
+public final class DeliciousTagsExample {
+  private DeliciousTagsExample() { }
+  
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+    ArgumentBuilder abuilder = new ArgumentBuilder();
+    GroupBuilder gbuilder = new GroupBuilder();
+    Option inputDirOpt = DefaultOptionCreator.inputOption().create();
+    
+    Option outputOpt = DefaultOptionCreator.outputOption().create();
+    
+    Option helpOpt = DefaultOptionCreator.helpOption();
+    Option recordSplitterOpt = obuilder.withLongName("splitterPattern").withArgument(
+      abuilder.withName("splitterPattern").withMinimum(1).withMaximum(1).create()).withDescription(
+      "Regular Expression pattern used to split given line into fields."
+          + " Default value splits comma or tab separated fields."
+          + " Default Value: \"[ ,\\t]*\\t[ ,\\t]*\" ").withShortName("regex").create();
+    Option encodingOpt = obuilder.withLongName("encoding").withArgument(
+      abuilder.withName("encoding").withMinimum(1).withMaximum(1).create()).withDescription(
+      "(Optional) The file encoding.  Default value: UTF-8").withShortName("e").create();
+    Group group = gbuilder.withName("Options").withOption(inputDirOpt).withOption(outputOpt).withOption(
+      helpOpt).withOption(recordSplitterOpt).withOption(encodingOpt).create();
+    
+    try {
+      Parser parser = new Parser();
+      parser.setGroup(group);
+      CommandLine cmdLine = parser.parse(args);
+      
+      if (cmdLine.hasOption(helpOpt)) {
+        CommandLineUtil.printHelp(group);
+        return;
+      }
+      Parameters params = new Parameters();
+      if (cmdLine.hasOption(recordSplitterOpt)) {
+        params.set("splitPattern", (String) cmdLine.getValue(recordSplitterOpt));
+      }
+      
+      String encoding = "UTF-8";
+      if (cmdLine.hasOption(encodingOpt)) {
+        encoding = (String) cmdLine.getValue(encodingOpt);
+      }
+      params.set("encoding", encoding);
+      String inputDir = (String) cmdLine.getValue(inputDirOpt);
+      String outputDir = (String) cmdLine.getValue(outputOpt);
+      params.set("input", inputDir);
+      params.set("output", outputDir);
+      params.set("groupingFieldCount", "2");
+      params.set("gfield0", "1");
+      params.set("gfield1", "2");
+      params.set("selectedFieldCount", "1");
+      params.set("field0", "3");
+      params.set("maxTransactionLength", "100");
+      KeyBasedStringTupleGrouper.startJob(params);
+      
+    } catch (OptionException ex) {
+      CommandLineUtil.printHelp(group);
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java
new file mode 100644
index 0000000..4c80a31
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.StringTuple;
+
+public class KeyBasedStringTupleCombiner extends Reducer<Text,StringTuple,Text,StringTuple> {
+  
+  @Override
+  protected void reduce(Text key,
+                        Iterable<StringTuple> values,
+                        Context context) throws IOException, InterruptedException {
+    Set<String> outputValues = new HashSet<>();
+    for (StringTuple value : values) {
+      outputValues.addAll(value.getEntries());
+    }
+    context.write(key, new StringTuple(outputValues));
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java
new file mode 100644
index 0000000..cd17770
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
+
+public final class KeyBasedStringTupleGrouper {
+  
+  private KeyBasedStringTupleGrouper() { }
+  
+  public static void startJob(Parameters params) throws IOException,
+                                                InterruptedException,
+                                                ClassNotFoundException {
+    Configuration conf = new Configuration();
+    
+    conf.set("job.parameters", params.toString());
+    conf.set("mapred.compress.map.output", "true");
+    conf.set("mapred.output.compression.type", "BLOCK");
+    conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
+    
+    String input = params.get("input");
+    Job job = new Job(conf, "Generating dataset based from input" + input);
+    job.setJarByClass(KeyBasedStringTupleGrouper.class);
+    
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(StringTuple.class);
+    
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    
+    FileInputFormat.addInputPath(job, new Path(input));
+    Path outPath = new Path(params.get("output"));
+    FileOutputFormat.setOutputPath(job, outPath);
+    
+    HadoopUtil.delete(conf, outPath);
+
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setMapperClass(KeyBasedStringTupleMapper.class);
+    job.setCombinerClass(KeyBasedStringTupleCombiner.class);
+    job.setReducerClass(KeyBasedStringTupleReducer.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java
new file mode 100644
index 0000000..362d1ce
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Splits the line using a {@link Pattern} and outputs key as given by the groupingFields
+ * 
+ */
+public class KeyBasedStringTupleMapper extends Mapper<LongWritable,Text,Text,StringTuple> {
+  
+  private static final Logger log = LoggerFactory.getLogger(KeyBasedStringTupleMapper.class);
+  
+  private Pattern splitter;
+  
+  private int[] selectedFields;
+  
+  private int[] groupingFields;
+  
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+    String[] fields = splitter.split(value.toString());
+    if (fields.length != 4) {
+      log.info("{} {}", fields.length, value.toString());
+      context.getCounter("Map", "ERROR").increment(1);
+      return;
+    }
+    Collection<String> oKey = new ArrayList<>();
+    for (int groupingField : groupingFields) {
+      oKey.add(fields[groupingField]);
+      context.setStatus(fields[groupingField]);
+    }
+    
+    List<String> oValue = new ArrayList<>();
+    for (int selectedField : selectedFields) {
+      oValue.add(fields[selectedField]);
+    }
+    
+    context.write(new Text(oKey.toString()), new StringTuple(oValue));
+    
+  }
+  
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Parameters params = new Parameters(context.getConfiguration().get("job.parameters", ""));
+    splitter = Pattern.compile(params.get("splitPattern", "[ \t]*\t[ \t]*"));
+    
+    int selectedFieldCount = Integer.valueOf(params.get("selectedFieldCount", "0"));
+    selectedFields = new int[selectedFieldCount];
+    for (int i = 0; i < selectedFieldCount; i++) {
+      selectedFields[i] = Integer.valueOf(params.get("field" + i, "0"));
+    }
+    
+    int groupingFieldCount = Integer.valueOf(params.get("groupingFieldCount", "0"));
+    groupingFields = new int[groupingFieldCount];
+    for (int i = 0; i < groupingFieldCount; i++) {
+      groupingFields[i] = Integer.valueOf(params.get("gfield" + i, "0"));
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java
new file mode 100644
index 0000000..a7ef762
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
+
+public class KeyBasedStringTupleReducer extends Reducer<Text,StringTuple,Text,Text> {
+  
+  private int maxTransactionLength = 100;
+  
+  @Override
+  protected void reduce(Text key, Iterable<StringTuple> values, Context context)
+    throws IOException, InterruptedException {
+    Collection<String> items = new HashSet<>();
+    
+    for (StringTuple value : values) {
+      for (String field : value.getEntries()) {
+        items.add(field);
+      }
+    }
+    if (items.size() > 1) {
+      int i = 0;
+      StringBuilder sb = new StringBuilder();
+      String sep = "";
+      for (String field : items) {
+        if (i % maxTransactionLength == 0) {
+          if (i != 0) {
+            context.write(null, new Text(sb.toString()));
+          }
+          sb.replace(0, sb.length(), "");
+          sep = "";
+        }
+        
+        sb.append(sep).append(field);
+        sep = "\t";
+        
+        i++;
+        
+      }
+      if (sb.length() > 0) {
+        context.write(null, new Text(sb.toString()));
+      }
+    }
+  }
+  
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    Parameters params = new Parameters(context.getConfiguration().get("job.parameters", ""));
+    maxTransactionLength = Integer.valueOf(params.get("maxTransactionLength", "100"));
+  }
+}