You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/28 14:55:00 UTC
[32/51] [partial] mahout git commit: NO-JIRA Clean up MR refactor
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java
new file mode 100644
index 0000000..bd1149b
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.tools;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.streaming.mapreduce.CentroidWritable;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class IOUtils {
+
+ private IOUtils() {}
+
+ /**
+ * Converts CentroidWritable values in a sequence file into Centroids lazily.
+ * @param dirIterable the source iterable (comes from a SequenceFileDirIterable).
+ * @return an Iterable<Centroid> with the converted vectors.
+ */
+ public static Iterable<Centroid> getCentroidsFromCentroidWritableIterable(
+ Iterable<CentroidWritable> dirIterable) {
+ return Iterables.transform(dirIterable, new Function<CentroidWritable, Centroid>() {
+ @Override
+ public Centroid apply(CentroidWritable input) {
+ Preconditions.checkNotNull(input);
+ return input.getCentroid().clone();
+ }
+ });
+ }
+
+ /**
+ * Converts CentroidWritable values in a sequence file into Centroids lazily.
+ * @param dirIterable the source iterable (comes from a SequenceFileDirIterable).
+ * @return an Iterable<Centroid> with the converted vectors.
+ */
+ public static Iterable<Centroid> getCentroidsFromClusterWritableIterable(Iterable<ClusterWritable> dirIterable) {
+ return Iterables.transform(dirIterable, new Function<ClusterWritable, Centroid>() {
+ int numClusters = 0;
+ @Override
+ public Centroid apply(ClusterWritable input) {
+ Preconditions.checkNotNull(input);
+ return new Centroid(numClusters++, input.getValue().getCenter().clone(),
+ input.getValue().getTotalObservations());
+ }
+ });
+ }
+
+ /**
+ * Converts VectorWritable values in a sequence file into Vectors lazily.
+ * @param dirIterable the source iterable (comes from a SequenceFileDirIterable).
+ * @return an Iterable<Vector> with the converted vectors.
+ */
+ public static Iterable<Vector> getVectorsFromVectorWritableIterable(Iterable<VectorWritable> dirIterable) {
+ return Iterables.transform(dirIterable, new Function<VectorWritable, Vector>() {
+ @Override
+ public Vector apply(VectorWritable input) {
+ Preconditions.checkNotNull(input);
+ return input.get().clone();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
new file mode 100644
index 0000000..083cd8c
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.syntheticcontrol.canopy;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.conversion.InputDriver;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.utils.clustering.ClusterDumper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+public final class Job extends AbstractJob {
+
+ private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
+
+ private Job() {
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(Job.class);
+
+ public static void main(String[] args) throws Exception {
+ if (args.length > 0) {
+ log.info("Running with only user-supplied arguments");
+ ToolRunner.run(new Configuration(), new Job(), args);
+ } else {
+ log.info("Running with default arguments");
+ Path output = new Path("output");
+ HadoopUtil.delete(new Configuration(), output);
+ run(new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55);
+ }
+ }
+
+ /**
+ * Run the canopy clustering job on an input dataset using the given distance
+ * measure, t1 and t2 parameters. All output data will be written to the
+ * output directory, which will be initially deleted if it exists. The
+ * clustered points will reside in the path <output>/clustered-points. By
+ * default, the job expects the a file containing synthetic_control.data as
+ * obtained from
+ * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series
+ * resides in a directory named "testdata", and writes output to a directory
+ * named "output".
+ *
+ * @param input
+ * the String denoting the input directory path
+ * @param output
+ * the String denoting the output directory path
+ * @param measure
+ * the DistanceMeasure to use
+ * @param t1
+ * the canopy T1 threshold
+ * @param t2
+ * the canopy T2 threshold
+ */
+ private static void run(Path input, Path output, DistanceMeasure measure,
+ double t1, double t2) throws Exception {
+ Path directoryContainingConvertedInput = new Path(output,
+ DIRECTORY_CONTAINING_CONVERTED_INPUT);
+ InputDriver.runJob(input, directoryContainingConvertedInput,
+ "org.apache.mahout.math.RandomAccessSparseVector");
+ CanopyDriver.run(new Configuration(), directoryContainingConvertedInput,
+ output, measure, t1, t2, true, 0.0, false);
+ // run ClusterDumper
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
+ "clusters-0-final"), new Path(output, "clusteredPoints"));
+ clusterDumper.printClusters(null);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.distanceMeasureOption().create());
+ addOption(DefaultOptionCreator.t1Option().create());
+ addOption(DefaultOptionCreator.t2Option().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+
+ Map<String, List<String>> argMap = parseArguments(args);
+ if (argMap == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(new Configuration(), output);
+ }
+ String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+ double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+ DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+
+ run(input, output, measure, t1, t2);
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
new file mode 100644
index 0000000..43beb78
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.syntheticcontrol.fuzzykmeans;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.conversion.InputDriver;
+import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.utils.clustering.ClusterDumper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Job extends AbstractJob {
+
+ private static final Logger log = LoggerFactory.getLogger(Job.class);
+
+ private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
+
+ private static final String M_OPTION = FuzzyKMeansDriver.M_OPTION;
+
+ private Job() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length > 0) {
+ log.info("Running with only user-supplied arguments");
+ ToolRunner.run(new Configuration(), new Job(), args);
+ } else {
+ log.info("Running with default arguments");
+ Path output = new Path("output");
+ Configuration conf = new Configuration();
+ HadoopUtil.delete(conf, output);
+ run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55, 10, 2.0f, 0.5);
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.distanceMeasureOption().create());
+ addOption(DefaultOptionCreator.convergenceOption().create());
+ addOption(DefaultOptionCreator.maxIterationsOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption(DefaultOptionCreator.t1Option().create());
+ addOption(DefaultOptionCreator.t2Option().create());
+ addOption(M_OPTION, M_OPTION, "coefficient normalization factor, must be greater than 1", true);
+
+ Map<String,List<String>> argMap = parseArguments(args);
+ if (argMap == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ if (measureClass == null) {
+ measureClass = SquaredEuclideanDistanceMeasure.class.getName();
+ }
+ double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+ int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+ float fuzziness = Float.parseFloat(getOption(M_OPTION));
+
+ addOption(new DefaultOptionBuilder().withLongName(M_OPTION).withRequired(true)
+ .withArgument(new ArgumentBuilder().withName(M_OPTION).withMinimum(1).withMaximum(1).create())
+ .withDescription("coefficient normalization factor, must be greater than 1").withShortName(M_OPTION).create());
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), output);
+ }
+ DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+ double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+ double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+ run(getConf(), input, output, measure, t1, t2, maxIterations, fuzziness, convergenceDelta);
+ return 0;
+ }
+
+ /**
+ * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+ * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+ * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+ * containing synthetic_control.data as obtained from
+ * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+ * and writes output to a directory named "output".
+ *
+ * @param input
+ * the String denoting the input directory path
+ * @param output
+ * the String denoting the output directory path
+ * @param t1
+ * the canopy T1 threshold
+ * @param t2
+ * the canopy T2 threshold
+ * @param maxIterations
+ * the int maximum number of iterations
+ * @param fuzziness
+ * the float "m" fuzziness coefficient
+ * @param convergenceDelta
+ * the double convergence criteria for iterations
+ */
+ public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+ int maxIterations, float fuzziness, double convergenceDelta) throws Exception {
+ Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
+ log.info("Preparing Input");
+ InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
+ log.info("Running Canopy to get initial clusters");
+ Path canopyOutput = new Path(output, "canopies");
+ CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0, false);
+ log.info("Running FuzzyKMeans");
+ FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(canopyOutput, "clusters-0-final"), output,
+ convergenceDelta, maxIterations, fuzziness, true, true, 0.0, false);
+ // run ClusterDumper
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output, "clusteredPoints"));
+ clusterDumper.printClusters(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
new file mode 100644
index 0000000..70c41fe
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.syntheticcontrol.kmeans;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.conversion.InputDriver;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.utils.clustering.ClusterDumper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Job extends AbstractJob {
+
+ private static final Logger log = LoggerFactory.getLogger(Job.class);
+
+ private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
+
+ private Job() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length > 0) {
+ log.info("Running with only user-supplied arguments");
+ ToolRunner.run(new Configuration(), new Job(), args);
+ } else {
+ log.info("Running with default arguments");
+ Path output = new Path("output");
+ Configuration conf = new Configuration();
+ HadoopUtil.delete(conf, output);
+ run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 6, 0.5, 10);
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.distanceMeasureOption().create());
+ addOption(DefaultOptionCreator.numClustersOption().create());
+ addOption(DefaultOptionCreator.t1Option().create());
+ addOption(DefaultOptionCreator.t2Option().create());
+ addOption(DefaultOptionCreator.convergenceOption().create());
+ addOption(DefaultOptionCreator.maxIterationsOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+
+ Map<String,List<String>> argMap = parseArguments(args);
+ if (argMap == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ if (measureClass == null) {
+ measureClass = SquaredEuclideanDistanceMeasure.class.getName();
+ }
+ double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+ int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), output);
+ }
+ DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+ if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
+ int k = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
+ run(getConf(), input, output, measure, k, convergenceDelta, maxIterations);
+ } else {
+ double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
+ double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+ run(getConf(), input, output, measure, t1, t2, convergenceDelta, maxIterations);
+ }
+ return 0;
+ }
+
+ /**
+ * Run the kmeans clustering job on an input dataset using the given the number of clusters k and iteration
+ * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+ * The clustered points will reside in the path <output>/clustered-points. By default, the job expects a file
+ * containing equal length space delimited data that resides in a directory named "testdata", and writes output to a
+ * directory named "output".
+ *
+ * @param conf
+ * the Configuration to use
+ * @param input
+ * the String denoting the input directory path
+ * @param output
+ * the String denoting the output directory path
+ * @param measure
+ * the DistanceMeasure to use
+ * @param k
+ * the number of clusters in Kmeans
+ * @param convergenceDelta
+ * the double convergence criteria for iterations
+ * @param maxIterations
+ * the int maximum number of iterations
+ */
+ public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k,
+ double convergenceDelta, int maxIterations) throws Exception {
+ Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
+ log.info("Preparing Input");
+ InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
+ log.info("Running random seed to get initial clusters");
+ Path clusters = new Path(output, "random-seeds");
+ clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure);
+ log.info("Running KMeans with k = {}", k);
+ KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, convergenceDelta,
+ maxIterations, true, 0.0, false);
+ // run ClusterDumper
+ Path outGlob = new Path(output, "clusters-*-final");
+ Path clusteredPoints = new Path(output,"clusteredPoints");
+ log.info("Dumping out clusters from clusters: {} and clusteredPoints: {}", outGlob, clusteredPoints);
+ ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints);
+ clusterDumper.printClusters(null);
+ }
+
+ /**
+ * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration
+ * parameters. All output data will be written to the output directory, which will be initially deleted if it exists.
+ * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file
+ * containing synthetic_control.data as obtained from
+ * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata",
+ * and writes output to a directory named "output".
+ *
+ * @param conf
+ * the Configuration to use
+ * @param input
+ * the String denoting the input directory path
+ * @param output
+ * the String denoting the output directory path
+ * @param measure
+ * the DistanceMeasure to use
+ * @param t1
+ * the canopy T1 threshold
+ * @param t2
+ * the canopy T2 threshold
+ * @param convergenceDelta
+ * the double convergence criteria for iterations
+ * @param maxIterations
+ * the int maximum number of iterations
+ */
+ public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2,
+ double convergenceDelta, int maxIterations) throws Exception {
+ Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
+ log.info("Preparing Input");
+ InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
+ log.info("Running Canopy to get initial clusters");
+ Path canopyOutput = new Path(output, "canopies");
+ CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0,
+ false);
+ log.info("Running KMeans");
+ KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR
+ + "-final"), output, convergenceDelta, maxIterations, true, 0.0, false);
+ // run ClusterDumper
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output,
+ "clusteredPoints"));
+ clusterDumper.printClusters(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java
new file mode 100644
index 0000000..92363e5
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth;
+
+import java.io.IOException;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.fpm.pfpgrowth.dataset.KeyBasedStringTupleGrouper;
+
+public final class DeliciousTagsExample {
+ private DeliciousTagsExample() { }
+
+ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+ Option inputDirOpt = DefaultOptionCreator.inputOption().create();
+
+ Option outputOpt = DefaultOptionCreator.outputOption().create();
+
+ Option helpOpt = DefaultOptionCreator.helpOption();
+ Option recordSplitterOpt = obuilder.withLongName("splitterPattern").withArgument(
+ abuilder.withName("splitterPattern").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Regular Expression pattern used to split given line into fields."
+ + " Default value splits comma or tab separated fields."
+ + " Default Value: \"[ ,\\t]*\\t[ ,\\t]*\" ").withShortName("regex").create();
+ Option encodingOpt = obuilder.withLongName("encoding").withArgument(
+ abuilder.withName("encoding").withMinimum(1).withMaximum(1).create()).withDescription(
+ "(Optional) The file encoding. Default value: UTF-8").withShortName("e").create();
+ Group group = gbuilder.withName("Options").withOption(inputDirOpt).withOption(outputOpt).withOption(
+ helpOpt).withOption(recordSplitterOpt).withOption(encodingOpt).create();
+
+ try {
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ CommandLine cmdLine = parser.parse(args);
+
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+ Parameters params = new Parameters();
+ if (cmdLine.hasOption(recordSplitterOpt)) {
+ params.set("splitPattern", (String) cmdLine.getValue(recordSplitterOpt));
+ }
+
+ String encoding = "UTF-8";
+ if (cmdLine.hasOption(encodingOpt)) {
+ encoding = (String) cmdLine.getValue(encodingOpt);
+ }
+ params.set("encoding", encoding);
+ String inputDir = (String) cmdLine.getValue(inputDirOpt);
+ String outputDir = (String) cmdLine.getValue(outputOpt);
+ params.set("input", inputDir);
+ params.set("output", outputDir);
+ params.set("groupingFieldCount", "2");
+ params.set("gfield0", "1");
+ params.set("gfield1", "2");
+ params.set("selectedFieldCount", "1");
+ params.set("field0", "3");
+ params.set("maxTransactionLength", "100");
+ KeyBasedStringTupleGrouper.startJob(params);
+
+ } catch (OptionException ex) {
+ CommandLineUtil.printHelp(group);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java
new file mode 100644
index 0000000..4c80a31
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.StringTuple;
+
+public class KeyBasedStringTupleCombiner extends Reducer<Text,StringTuple,Text,StringTuple> {
+
+ @Override
+ protected void reduce(Text key,
+ Iterable<StringTuple> values,
+ Context context) throws IOException, InterruptedException {
+ Set<String> outputValues = new HashSet<>();
+ for (StringTuple value : values) {
+ outputValues.addAll(value.getEntries());
+ }
+ context.write(key, new StringTuple(outputValues));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java
new file mode 100644
index 0000000..cd17770
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
+
+public final class KeyBasedStringTupleGrouper {
+
+ private KeyBasedStringTupleGrouper() { }
+
+ public static void startJob(Parameters params) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
+ Configuration conf = new Configuration();
+
+ conf.set("job.parameters", params.toString());
+ conf.set("mapred.compress.map.output", "true");
+ conf.set("mapred.output.compression.type", "BLOCK");
+ conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+ String input = params.get("input");
+ Job job = new Job(conf, "Generating dataset based from input" + input);
+ job.setJarByClass(KeyBasedStringTupleGrouper.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(StringTuple.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ FileInputFormat.addInputPath(job, new Path(input));
+ Path outPath = new Path(params.get("output"));
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ HadoopUtil.delete(conf, outPath);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(KeyBasedStringTupleMapper.class);
+ job.setCombinerClass(KeyBasedStringTupleCombiner.class);
+ job.setReducerClass(KeyBasedStringTupleReducer.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java
new file mode 100644
index 0000000..362d1ce
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Splits the line using a {@link Pattern} and outputs key as given by the groupingFields
+ *
+ */
+public class KeyBasedStringTupleMapper extends Mapper<LongWritable,Text,Text,StringTuple> {
+
+ private static final Logger log = LoggerFactory.getLogger(KeyBasedStringTupleMapper.class);
+
+ private Pattern splitter;
+
+ private int[] selectedFields;
+
+ private int[] groupingFields;
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String[] fields = splitter.split(value.toString());
+ if (fields.length != 4) {
+ log.info("{} {}", fields.length, value.toString());
+ context.getCounter("Map", "ERROR").increment(1);
+ return;
+ }
+ Collection<String> oKey = new ArrayList<>();
+ for (int groupingField : groupingFields) {
+ oKey.add(fields[groupingField]);
+ context.setStatus(fields[groupingField]);
+ }
+
+ List<String> oValue = new ArrayList<>();
+ for (int selectedField : selectedFields) {
+ oValue.add(fields[selectedField]);
+ }
+
+ context.write(new Text(oKey.toString()), new StringTuple(oValue));
+
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Parameters params = new Parameters(context.getConfiguration().get("job.parameters", ""));
+ splitter = Pattern.compile(params.get("splitPattern", "[ \t]*\t[ \t]*"));
+
+ int selectedFieldCount = Integer.valueOf(params.get("selectedFieldCount", "0"));
+ selectedFields = new int[selectedFieldCount];
+ for (int i = 0; i < selectedFieldCount; i++) {
+ selectedFields[i] = Integer.valueOf(params.get("field" + i, "0"));
+ }
+
+ int groupingFieldCount = Integer.valueOf(params.get("groupingFieldCount", "0"));
+ groupingFields = new int[groupingFieldCount];
+ for (int i = 0; i < groupingFieldCount; i++) {
+ groupingFields[i] = Integer.valueOf(params.get("gfield" + i, "0"));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java
new file mode 100644
index 0000000..a7ef762
--- /dev/null
+++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.fpm.pfpgrowth.dataset;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
+
+public class KeyBasedStringTupleReducer extends Reducer<Text,StringTuple,Text,Text> {
+
+ private int maxTransactionLength = 100;
+
+ @Override
+ protected void reduce(Text key, Iterable<StringTuple> values, Context context)
+ throws IOException, InterruptedException {
+ Collection<String> items = new HashSet<>();
+
+ for (StringTuple value : values) {
+ for (String field : value.getEntries()) {
+ items.add(field);
+ }
+ }
+ if (items.size() > 1) {
+ int i = 0;
+ StringBuilder sb = new StringBuilder();
+ String sep = "";
+ for (String field : items) {
+ if (i % maxTransactionLength == 0) {
+ if (i != 0) {
+ context.write(null, new Text(sb.toString()));
+ }
+ sb.replace(0, sb.length(), "");
+ sep = "";
+ }
+
+ sb.append(sep).append(field);
+ sep = "\t";
+
+ i++;
+
+ }
+ if (sb.length() > 0) {
+ context.write(null, new Text(sb.toString()));
+ }
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Parameters params = new Parameters(context.getConfiguration().get("job.parameters", ""));
+ maxTransactionLength = Integer.valueOf(params.get("maxTransactionLength", "100"));
+ }
+}