You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:52:14 UTC
[46/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045
Delete directories which were moved/no longer in use
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java
new file mode 100644
index 0000000..757f38c
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class ClusterEvaluator {
+
+ private static final Logger log = LoggerFactory.getLogger(ClusterEvaluator.class);
+
+ private final Map<Integer,List<VectorWritable>> representativePoints;
+
+ private final List<Cluster> clusters;
+
+ private final DistanceMeasure measure;
+
+ /**
+ * For testing only
+ *
+ * @param representativePoints
+ * a Map<Integer,List<VectorWritable>> of representative points keyed by clusterId
+ * @param clusters
+ * a Map<Integer,Cluster> of the clusters keyed by clusterId
+ * @param measure
+ * an appropriate DistanceMeasure
+ */
+ public ClusterEvaluator(Map<Integer,List<VectorWritable>> representativePoints, List<Cluster> clusters,
+ DistanceMeasure measure) {
+ this.representativePoints = representativePoints;
+ this.clusters = clusters;
+ this.measure = measure;
+ }
+
+ /**
+ * Initialize a new instance from job information
+ *
+ * @param conf
+ * a Configuration with appropriate parameters
+ * @param clustersIn
+ * a String path to the input clusters directory
+ */
+ public ClusterEvaluator(Configuration conf, Path clustersIn) {
+ measure = ClassUtils
+ .instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class);
+ representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf);
+ clusters = loadClusters(conf, clustersIn);
+ }
+
+ /**
+ * Load the clusters from their sequence files
+ *
+ * @param clustersIn
+ * a String pathname to the directory containing input cluster files
+ * @return a List<Cluster> of the clusters
+ */
+ private static List<Cluster> loadClusters(Configuration conf, Path clustersIn) {
+ List<Cluster> clusters = new ArrayList<>();
+ for (ClusterWritable clusterWritable : new SequenceFileDirValueIterable<ClusterWritable>(clustersIn, PathType.LIST,
+ PathFilters.logsCRCFilter(), conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ clusters.add(cluster);
+ }
+ return clusters;
+ }
+
+ /**
+ * Computes the inter-cluster density as defined in "Mahout In Action"
+ *
+ * @return the interClusterDensity
+ */
+ public double interClusterDensity() {
+ double max = Double.NEGATIVE_INFINITY;
+ double min = Double.POSITIVE_INFINITY;
+ double sum = 0;
+ int count = 0;
+ Map<Integer,Vector> distances = interClusterDistances();
+ for (Vector row : distances.values()) {
+ for (Element element : row.nonZeroes()) {
+ double d = element.get();
+ min = Math.min(d, min);
+ max = Math.max(d, max);
+ sum += d;
+ count++;
+ }
+ }
+ double density = (sum / count - min) / (max - min);
+ log.info("Scaled Inter-Cluster Density = {}", density);
+ return density;
+ }
+
+ /**
+ * Computes the inter-cluster distances
+ *
+ * @return a Map<Integer, Vector>
+ */
+ public Map<Integer,Vector> interClusterDistances() {
+ Map<Integer,Vector> distances = new TreeMap<>();
+ for (int i = 0; i < clusters.size(); i++) {
+ Cluster clusterI = clusters.get(i);
+ RandomAccessSparseVector row = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ distances.put(clusterI.getId(), row);
+ for (int j = i + 1; j < clusters.size(); j++) {
+ Cluster clusterJ = clusters.get(j);
+ double d = measure.distance(clusterI.getCenter(), clusterJ.getCenter());
+ row.set(clusterJ.getId(), d);
+ }
+ }
+ return distances;
+ }
+
+ /**
+ * Computes the average intra-cluster density as the average of each cluster's intra-cluster density
+ *
+ * @return the average intraClusterDensity
+ */
+ public double intraClusterDensity() {
+ double avgDensity = 0;
+ int count = 0;
+ for (Element elem : intraClusterDensities().nonZeroes()) {
+ double value = elem.get();
+ if (!Double.isNaN(value)) {
+ avgDensity += value;
+ count++;
+ }
+ }
+ avgDensity = clusters.isEmpty() ? 0 : avgDensity / count;
+ log.info("Average Intra-Cluster Density = {}", avgDensity);
+ return avgDensity;
+ }
+
+ /**
+ * Computes the intra-cluster densities for all clusters as the average distance of the representative points from
+ * each other
+ *
+ * @return a Vector of the intraClusterDensity of the representativePoints by clusterId
+ */
+ public Vector intraClusterDensities() {
+ Vector densities = new RandomAccessSparseVector(Integer.MAX_VALUE);
+ for (Cluster cluster : clusters) {
+ int count = 0;
+ double max = Double.NEGATIVE_INFINITY;
+ double min = Double.POSITIVE_INFINITY;
+ double sum = 0;
+ List<VectorWritable> repPoints = representativePoints.get(cluster.getId());
+ for (int i = 0; i < repPoints.size(); i++) {
+ for (int j = i + 1; j < repPoints.size(); j++) {
+ Vector v1 = repPoints.get(i).get();
+ Vector v2 = repPoints.get(j).get();
+ double d = measure.distance(v1, v2);
+ min = Math.min(d, min);
+ max = Math.max(d, max);
+ sum += d;
+ count++;
+ }
+ }
+ double density = (sum / count - min) / (max - min);
+ densities.set(cluster.getId(), density);
+ log.info("Intra-Cluster Density[{}] = {}", cluster.getId(), density);
+ }
+ return densities;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java
new file mode 100644
index 0000000..2fe37ef
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class RepresentativePointsDriver extends AbstractJob {
+
+ public static final String STATE_IN_KEY = "org.apache.mahout.clustering.stateIn";
+
+ public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.measure";
+
+ private static final Logger log = LoggerFactory.getLogger(RepresentativePointsDriver.class);
+
+ private RepresentativePointsDriver() {}
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new RepresentativePointsDriver(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
+ addInputOption();
+ addOutputOption();
+ addOption("clusteredPoints", "cp", "The path to the clustered points", true);
+ addOption(DefaultOptionCreator.distanceMeasureOption().create());
+ addOption(DefaultOptionCreator.maxIterationsOption().create());
+ addOption(DefaultOptionCreator.methodOption().create());
+ if (parseArguments(args) == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ String distanceMeasureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+ boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+ DefaultOptionCreator.SEQUENTIAL_METHOD);
+ DistanceMeasure measure = ClassUtils.instantiateAs(distanceMeasureClass, DistanceMeasure.class);
+ Path clusteredPoints = new Path(getOption("clusteredPoints"));
+ run(getConf(), input, clusteredPoints, output, measure, maxIterations, runSequential);
+ return 0;
+ }
+
+ /**
+ * Utility to print out representative points
+ *
+ * @param output
+ * the Path to the directory containing representativePoints-i folders
+ * @param numIterations
+ * the int number of iterations to print
+ */
+ public static void printRepresentativePoints(Path output, int numIterations) {
+ for (int i = 0; i <= numIterations; i++) {
+ Path out = new Path(output, "representativePoints-" + i);
+ System.out.println("Representative Points for iteration " + i);
+ Configuration conf = new Configuration();
+ for (Pair<IntWritable,VectorWritable> record : new SequenceFileDirIterable<IntWritable,VectorWritable>(out,
+ PathType.LIST, PathFilters.logsCRCFilter(), null, true, conf)) {
+ System.out.println("\tC-" + record.getFirst().get() + ": "
+ + AbstractCluster.formatVector(record.getSecond().get(), null));
+ }
+ }
+ }
+
+ public static void run(Configuration conf, Path clustersIn, Path clusteredPointsIn, Path output,
+ DistanceMeasure measure, int numIterations, boolean runSequential) throws IOException, InterruptedException,
+ ClassNotFoundException {
+ Path stateIn = new Path(output, "representativePoints-0");
+ writeInitialState(stateIn, clustersIn);
+
+ for (int iteration = 0; iteration < numIterations; iteration++) {
+ log.info("Representative Points Iteration {}", iteration);
+ // point the output to a new directory per iteration
+ Path stateOut = new Path(output, "representativePoints-" + (iteration + 1));
+ runIteration(conf, clusteredPointsIn, stateIn, stateOut, measure, runSequential);
+ // now point the input to the old output directory
+ stateIn = stateOut;
+ }
+
+ conf.set(STATE_IN_KEY, stateIn.toString());
+ conf.set(DISTANCE_MEASURE_KEY, measure.getClass().getName());
+ }
+
+ private static void writeInitialState(Path output, Path clustersIn) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(output.toUri(), conf);
+ for (FileStatus dir : fs.globStatus(clustersIn)) {
+ Path inPath = dir.getPath();
+ for (FileStatus part : fs.listStatus(inPath, PathFilters.logsCRCFilter())) {
+ Path inPart = part.getPath();
+ Path path = new Path(output, inPart.getName());
+ try (SequenceFile.Writer writer =
+ new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class)){
+ for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(inPart, true, conf)) {
+ Cluster cluster = clusterWritable.getValue();
+ if (log.isDebugEnabled()) {
+ log.debug("C-{}: {}", cluster.getId(), AbstractCluster.formatVector(cluster.getCenter(), null));
+ }
+ writer.append(new IntWritable(cluster.getId()), new VectorWritable(cluster.getCenter()));
+ }
+ }
+ }
+ }
+ }
+
+ private static void runIteration(Configuration conf, Path clusteredPointsIn, Path stateIn, Path stateOut,
+ DistanceMeasure measure, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
+ if (runSequential) {
+ runIterationSeq(conf, clusteredPointsIn, stateIn, stateOut, measure);
+ } else {
+ runIterationMR(conf, clusteredPointsIn, stateIn, stateOut, measure);
+ }
+ }
+
+ /**
+ * Run the job using supplied arguments as a sequential process
+ *
+ * @param conf
+ * the Configuration to use
+ * @param clusteredPointsIn
+ * the directory pathname for input points
+ * @param stateIn
+ * the directory pathname for input state
+ * @param stateOut
+ * the directory pathname for output state
+ * @param measure
+ * the DistanceMeasure to use
+ */
+ private static void runIterationSeq(Configuration conf, Path clusteredPointsIn, Path stateIn, Path stateOut,
+ DistanceMeasure measure) throws IOException {
+
+ Map<Integer,List<VectorWritable>> repPoints = RepresentativePointsMapper.getRepresentativePoints(conf, stateIn);
+ Map<Integer,WeightedVectorWritable> mostDistantPoints = new HashMap<>();
+ FileSystem fs = FileSystem.get(clusteredPointsIn.toUri(), conf);
+ for (Pair<IntWritable,WeightedVectorWritable> record
+ : new SequenceFileDirIterable<IntWritable,WeightedVectorWritable>(clusteredPointsIn, PathType.LIST,
+ PathFilters.logsCRCFilter(), null, true, conf)) {
+ RepresentativePointsMapper.mapPoint(record.getFirst(), record.getSecond(), measure, repPoints, mostDistantPoints);
+ }
+ int part = 0;
+ try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(stateOut, "part-m-" + part++),
+ IntWritable.class, VectorWritable.class)){
+ for (Entry<Integer,List<VectorWritable>> entry : repPoints.entrySet()) {
+ for (VectorWritable vw : entry.getValue()) {
+ writer.append(new IntWritable(entry.getKey()), vw);
+ }
+ }
+ }
+ try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(stateOut, "part-m-" + part++),
+ IntWritable.class, VectorWritable.class)){
+ for (Map.Entry<Integer,WeightedVectorWritable> entry : mostDistantPoints.entrySet()) {
+ writer.append(new IntWritable(entry.getKey()), new VectorWritable(entry.getValue().getVector()));
+ }
+ }
+ }
+
+ /**
+ * Run the job using supplied arguments as a Map/Reduce process
+ *
+ * @param conf
+ * the Configuration to use
+ * @param input
+ * the directory pathname for input points
+ * @param stateIn
+ * the directory pathname for input state
+ * @param stateOut
+ * the directory pathname for output state
+ * @param measure
+ * the DistanceMeasure to use
+ */
+ private static void runIterationMR(Configuration conf, Path input, Path stateIn, Path stateOut,
+ DistanceMeasure measure) throws IOException, InterruptedException, ClassNotFoundException {
+ conf.set(STATE_IN_KEY, stateIn.toString());
+ conf.set(DISTANCE_MEASURE_KEY, measure.getClass().getName());
+ Job job = new Job(conf, "Representative Points Driver running over input: " + input);
+ job.setJarByClass(RepresentativePointsDriver.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(WeightedVectorWritable.class);
+
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, stateOut);
+
+ job.setMapperClass(RepresentativePointsMapper.class);
+ job.setReducerClass(RepresentativePointsReducer.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
new file mode 100644
index 0000000..0ae79ad
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VectorWritable;
+
+public class RepresentativePointsMapper
+ extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
+
+ private Map<Integer, List<VectorWritable>> representativePoints;
+ private final Map<Integer, WeightedVectorWritable> mostDistantPoints = new HashMap<>();
+ private DistanceMeasure measure = new EuclideanDistanceMeasure();
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ for (Map.Entry<Integer, WeightedVectorWritable> entry : mostDistantPoints.entrySet()) {
+ context.write(new IntWritable(entry.getKey()), entry.getValue());
+ }
+ super.cleanup(context);
+ }
+
+ @Override
+ protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context)
+ throws IOException, InterruptedException {
+ mapPoint(clusterId, point, measure, representativePoints, mostDistantPoints);
+ }
+
+ public static void mapPoint(IntWritable clusterId,
+ WeightedVectorWritable point,
+ DistanceMeasure measure,
+ Map<Integer, List<VectorWritable>> representativePoints,
+ Map<Integer, WeightedVectorWritable> mostDistantPoints) {
+ int key = clusterId.get();
+ WeightedVectorWritable currentMDP = mostDistantPoints.get(key);
+
+ List<VectorWritable> repPoints = representativePoints.get(key);
+ double totalDistance = 0.0;
+ if (repPoints != null) {
+ for (VectorWritable refPoint : repPoints) {
+ totalDistance += measure.distance(refPoint.get(), point.getVector());
+ }
+ }
+ if (currentMDP == null || currentMDP.getWeight() < totalDistance) {
+ mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, point.getVector().clone()));
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ measure =
+ ClassUtils.instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class);
+ representativePoints = getRepresentativePoints(conf);
+ }
+
+ public void configure(Map<Integer, List<VectorWritable>> referencePoints, DistanceMeasure measure) {
+ this.representativePoints = referencePoints;
+ this.measure = measure;
+ }
+
+ public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) {
+ String statePath = conf.get(RepresentativePointsDriver.STATE_IN_KEY);
+ return getRepresentativePoints(conf, new Path(statePath));
+ }
+
+ public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf, Path statePath) {
+ Map<Integer, List<VectorWritable>> representativePoints = new HashMap<>();
+ for (Pair<IntWritable,VectorWritable> record
+ : new SequenceFileDirIterable<IntWritable,VectorWritable>(statePath,
+ PathType.LIST,
+ PathFilters.logsCRCFilter(),
+ conf)) {
+ int keyValue = record.getFirst().get();
+ List<VectorWritable> repPoints = representativePoints.get(keyValue);
+ if (repPoints == null) {
+ repPoints = new ArrayList<>();
+ representativePoints.put(keyValue, repPoints);
+ }
+ repPoints.add(record.getSecond());
+ }
+ return representativePoints;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
new file mode 100644
index 0000000..27ca861
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.evaluation;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.math.VectorWritable;
+
+public class RepresentativePointsReducer
+ extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
+
+ private Map<Integer, List<VectorWritable>> representativePoints;
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ for (Map.Entry<Integer, List<VectorWritable>> entry : representativePoints.entrySet()) {
+ IntWritable iw = new IntWritable(entry.getKey());
+ for (VectorWritable vw : entry.getValue()) {
+ context.write(iw, vw);
+ }
+ }
+ super.cleanup(context);
+ }
+
+ @Override
+ protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context)
+ throws IOException, InterruptedException {
+ // find the most distant point
+ WeightedVectorWritable mdp = null;
+ for (WeightedVectorWritable dpw : values) {
+ if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
+ mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
+ }
+ }
+ context.write(new IntWritable(key.get()), new VectorWritable(mdp.getVector()));
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf);
+ }
+
+ public void configure(Map<Integer, List<VectorWritable>> representativePoints) {
+ this.representativePoints = representativePoints;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
new file mode 100644
index 0000000..392909e
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.lda;
+
+import com.google.common.io.Closeables;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.IntPairWritable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.utils.vectors.VectorHelper;
+
+/**
+ * Class to print out the top K words for each topic.
+ */
+public final class LDAPrintTopics {
+
+ private LDAPrintTopics() { }
+
+ // Expands the queue list to have a Queue for topic K
+ private static void ensureQueueSize(Collection<Queue<Pair<String,Double>>> queues, int k) {
+ for (int i = queues.size(); i <= k; ++i) {
+ queues.add(new PriorityQueue<Pair<String,Double>>());
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option inputOpt = DefaultOptionCreator.inputOption().create();
+
+ Option dictOpt = obuilder.withLongName("dict").withRequired(true).withArgument(
+ abuilder.withName("dict").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Dictionary to read in, in the same format as one created by "
+ + "org.apache.mahout.utils.vectors.lucene.Driver").withShortName("d").create();
+
+ Option outOpt = DefaultOptionCreator.outputOption().create();
+
+ Option wordOpt = obuilder.withLongName("words").withRequired(false).withArgument(
+ abuilder.withName("words").withMinimum(0).withMaximum(1).withDefault("20").create()).withDescription(
+ "Number of words to print").withShortName("w").create();
+ Option dictTypeOpt = obuilder.withLongName("dictionaryType").withRequired(false).withArgument(
+ abuilder.withName("dictionaryType").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The dictionary file type (text|sequencefile)").withShortName("dt").create();
+ Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
+ .create();
+
+ Group group = gbuilder.withName("Options").withOption(dictOpt).withOption(outOpt).withOption(wordOpt)
+ .withOption(inputOpt).withOption(dictTypeOpt).create();
+ try {
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ CommandLine cmdLine = parser.parse(args);
+
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+
+ String input = cmdLine.getValue(inputOpt).toString();
+ String dictFile = cmdLine.getValue(dictOpt).toString();
+ int numWords = 20;
+ if (cmdLine.hasOption(wordOpt)) {
+ numWords = Integer.parseInt(cmdLine.getValue(wordOpt).toString());
+ }
+ Configuration config = new Configuration();
+
+ String dictionaryType = "text";
+ if (cmdLine.hasOption(dictTypeOpt)) {
+ dictionaryType = cmdLine.getValue(dictTypeOpt).toString();
+ }
+
+ List<String> wordList;
+ if ("text".equals(dictionaryType)) {
+ wordList = Arrays.asList(VectorHelper.loadTermDictionary(new File(dictFile)));
+ } else if ("sequencefile".equals(dictionaryType)) {
+ wordList = Arrays.asList(VectorHelper.loadTermDictionary(config, dictFile));
+ } else {
+ throw new IllegalArgumentException("Invalid dictionary format");
+ }
+
+ List<Queue<Pair<String,Double>>> topWords = topWordsForTopics(input, config, wordList, numWords);
+
+ File output = null;
+ if (cmdLine.hasOption(outOpt)) {
+ output = new File(cmdLine.getValue(outOpt).toString());
+ if (!output.exists() && !output.mkdirs()) {
+ throw new IOException("Could not create directory: " + output);
+ }
+ }
+ printTopWords(topWords, output);
+ } catch (OptionException e) {
+ CommandLineUtil.printHelp(group);
+ throw e;
+ }
+ }
+
+ // Adds the word if the queue is below capacity, or the score is high enough
+ private static void maybeEnqueue(Queue<Pair<String,Double>> q, String word, double score, int numWordsToPrint) {
+ if (q.size() >= numWordsToPrint && score > q.peek().getSecond()) {
+ q.poll();
+ }
+ if (q.size() < numWordsToPrint) {
+ q.add(new Pair<>(word, score));
+ }
+ }
+
+ private static void printTopWords(List<Queue<Pair<String,Double>>> topWords, File outputDir)
+ throws IOException {
+ for (int i = 0; i < topWords.size(); ++i) {
+ Collection<Pair<String,Double>> topK = topWords.get(i);
+ Writer out = null;
+ boolean printingToSystemOut = false;
+ try {
+ if (outputDir != null) {
+ out = new OutputStreamWriter(new FileOutputStream(new File(outputDir, "topic_" + i)), Charsets.UTF_8);
+ } else {
+ out = new OutputStreamWriter(System.out, Charsets.UTF_8);
+ printingToSystemOut = true;
+ out.write("Topic " + i);
+ out.write('\n');
+ out.write("===========");
+ out.write('\n');
+ }
+ List<Pair<String,Double>> topKasList = new ArrayList<>(topK.size());
+ for (Pair<String,Double> wordWithScore : topK) {
+ topKasList.add(wordWithScore);
+ }
+ Collections.sort(topKasList, new Comparator<Pair<String,Double>>() {
+ @Override
+ public int compare(Pair<String,Double> pair1, Pair<String,Double> pair2) {
+ return pair2.getSecond().compareTo(pair1.getSecond());
+ }
+ });
+ for (Pair<String,Double> wordWithScore : topKasList) {
+ out.write(wordWithScore.getFirst() + " [p(" + wordWithScore.getFirst() + "|topic_" + i + ") = "
+ + wordWithScore.getSecond());
+ out.write('\n');
+ }
+ } finally {
+ if (!printingToSystemOut) {
+ Closeables.close(out, false);
+ } else {
+ out.flush();
+ }
+ }
+ }
+ }
+
+ private static List<Queue<Pair<String,Double>>> topWordsForTopics(String dir,
+ Configuration job,
+ List<String> wordList,
+ int numWordsToPrint) {
+ List<Queue<Pair<String,Double>>> queues = new ArrayList<>();
+ Map<Integer,Double> expSums = new HashMap<>();
+ for (Pair<IntPairWritable,DoubleWritable> record
+ : new SequenceFileDirIterable<IntPairWritable, DoubleWritable>(
+ new Path(dir, "part-*"), PathType.GLOB, null, null, true, job)) {
+ IntPairWritable key = record.getFirst();
+ int topic = key.getFirst();
+ int word = key.getSecond();
+ ensureQueueSize(queues, topic);
+ if (word >= 0 && topic >= 0) {
+ double score = record.getSecond().get();
+ if (expSums.get(topic) == null) {
+ expSums.put(topic, 0.0);
+ }
+ expSums.put(topic, expSums.get(topic) + Math.exp(score));
+ String realWord = wordList.get(word);
+ maybeEnqueue(queues.get(topic), realWord, score, numWordsToPrint);
+ }
+ }
+ for (int i = 0; i < queues.size(); i++) {
+ Queue<Pair<String,Double>> queue = queues.get(i);
+ Queue<Pair<String,Double>> newQueue = new PriorityQueue<>(queue.size());
+ double norm = expSums.get(i);
+ for (Pair<String,Double> pair : queue) {
+ newQueue.add(new Pair<>(pair.getFirst(), Math.exp(pair.getSecond()) / norm));
+ }
+ queues.set(i, newQueue);
+ }
+ return queues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
new file mode 100644
index 0000000..12ed471
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.lucene.analysis.TokenFilter;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.core.LowerCaseFilter;
+import org.apache.lucene.analysis.core.StopFilter;
+import org.apache.lucene.analysis.en.PorterStemFilter;
+import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
+import org.apache.lucene.analysis.standard.StandardFilter;
+import org.apache.lucene.analysis.standard.StandardTokenizer;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.util.CharArraySet;
+import org.apache.lucene.analysis.util.StopwordAnalyzerBase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Lucene Analyzer designed for aggressive feature reduction
+ * for clustering the ASF Mail Archives using an extended set of
+ * stop words, excluding non-alpha-numeric tokens, and porter stemming.
+ */
+public final class MailArchivesClusteringAnalyzer extends StopwordAnalyzerBase {
+ // extended set of stop words composed of common mail terms like "hi",
+ // HTML tags, and Java keywords asmany of the messages in the archives
+ // are subversion check-in notifications
+
+ private static final CharArraySet STOP_SET = new CharArraySet(Arrays.asList(
+ "3d","7bit","a0","about","above","abstract","across","additional","after",
+ "afterwards","again","against","align","all","almost","alone","along",
+ "already","also","although","always","am","among","amongst","amoungst",
+ "amount","an","and","another","any","anybody","anyhow","anyone","anything",
+ "anyway","anywhere","are","arial","around","as","ascii","assert","at",
+ "back","background","base64","bcc","be","became","because","become","becomes",
+ "becoming","been","before","beforehand","behind","being","below","beside",
+ "besides","between","beyond","bgcolor","blank","blockquote","body","boolean",
+ "border","both","br","break","but","by","can","cannot","cant","case","catch",
+ "cc","cellpadding","cellspacing","center","char","charset","cheers","class",
+ "co","color","colspan","com","con","const","continue","could","couldnt",
+ "cry","css","de","dear","default","did","didnt","different","div","do",
+ "does","doesnt","done","dont","double","down","due","during","each","eg",
+ "eight","either","else","elsewhere","empty","encoding","enough","enum",
+ "etc","eu","even","ever","every","everyone","everything","everywhere",
+ "except","extends","face","family","few","ffffff","final","finally","float",
+ "font","for","former","formerly","fri","from","further","get","give","go",
+ "good","got","goto","gt","h1","ha","had","has","hasnt","have","he","head",
+ "height","hello","helvetica","hence","her","here","hereafter","hereby",
+ "herein","hereupon","hers","herself","hi","him","himself","his","how",
+ "however","hr","href","html","http","https","id","ie","if","ill","im",
+ "image","img","implements","import","in","inc","instanceof","int","interface",
+ "into","is","isnt","iso-8859-1","it","its","itself","ive","just","keep",
+ "last","latter","latterly","least","left","less","li","like","long","look",
+ "lt","ltd","mail","mailto","many","margin","may","me","meanwhile","message",
+ "meta","might","mill","mine","mon","more","moreover","most","mostly","mshtml",
+ "mso","much","must","my","myself","name","namely","native","nbsp","need",
+ "neither","never","nevertheless","new","next","nine","no","nobody","none",
+ "noone","nor","not","nothing","now","nowhere","null","of","off","often",
+ "ok","on","once","only","onto","or","org","other","others","otherwise",
+ "our","ours","ourselves","out","over","own","package","pad","per","perhaps",
+ "plain","please","pm","printable","private","protected","public","put",
+ "quot","quote","r1","r2","rather","re","really","regards","reply","return",
+ "right","said","same","sans","sat","say","saying","see","seem","seemed",
+ "seeming","seems","serif","serious","several","she","short","should","show",
+ "side","since","sincere","six","sixty","size","so","solid","some","somehow",
+ "someone","something","sometime","sometimes","somewhere","span","src",
+ "static","still","strictfp","string","strong","style","stylesheet","subject",
+ "such","sun","super","sure","switch","synchronized","table","take","target",
+ "td","text","th","than","thanks","that","the","their","them","themselves",
+ "then","thence","there","thereafter","thereby","therefore","therein","thereupon",
+ "these","they","thick","thin","think","third","this","those","though",
+ "three","through","throughout","throw","throws","thru","thu","thus","tm",
+ "to","together","too","top","toward","towards","tr","transfer","transient",
+ "try","tue","type","ul","un","under","unsubscribe","until","up","upon",
+ "us","use","used","uses","using","valign","verdana","very","via","void",
+ "volatile","want","was","we","wed","weight","well","were","what","whatever",
+ "when","whence","whenever","where","whereafter","whereas","whereby","wherein",
+ "whereupon","wherever","whether","which","while","whither","who","whoever",
+ "whole","whom","whose","why","width","will","with","within","without",
+ "wont","would","wrote","www","yes","yet","you","your","yours","yourself",
+ "yourselves"
+ ), false);
+
+ // Regex used to exclude non-alpha-numeric tokens
+ private static final Pattern ALPHA_NUMERIC = Pattern.compile("^[a-z][a-z0-9_]+$");
+ private static final Matcher MATCHER = ALPHA_NUMERIC.matcher("");
+
+ public MailArchivesClusteringAnalyzer() {
+ super(STOP_SET);
+ }
+
+ public MailArchivesClusteringAnalyzer(CharArraySet stopSet) {
+ super(stopSet);
+ }
+
+ @Override
+ protected TokenStreamComponents createComponents(String fieldName) {
+ Tokenizer tokenizer = new StandardTokenizer();
+ TokenStream result = new StandardFilter(tokenizer);
+ result = new LowerCaseFilter(result);
+ result = new ASCIIFoldingFilter(result);
+ result = new AlphaNumericMaxLengthFilter(result);
+ result = new StopFilter(result, STOP_SET);
+ result = new PorterStemFilter(result);
+ return new TokenStreamComponents(tokenizer, result);
+ }
+
+ /**
+ * Matches alpha-numeric tokens between 2 and 40 chars long.
+ */
+ static class AlphaNumericMaxLengthFilter extends TokenFilter {
+ private final CharTermAttribute termAtt;
+ private final char[] output = new char[28];
+
+ AlphaNumericMaxLengthFilter(TokenStream in) {
+ super(in);
+ termAtt = addAttribute(CharTermAttribute.class);
+ }
+
+ @Override
+ public final boolean incrementToken() throws IOException {
+ // return the first alpha-numeric token between 2 and 40 length
+ while (input.incrementToken()) {
+ int length = termAtt.length();
+ if (length >= 2 && length <= 28) {
+ char[] buf = termAtt.buffer();
+ int at = 0;
+ for (int c = 0; c < length; c++) {
+ char ch = buf[c];
+ if (ch != '\'') {
+ output[at++] = ch;
+ }
+ }
+ String term = new String(output, 0, at);
+ MATCHER.reset(term);
+ if (MATCHER.matches() && !term.startsWith("a0")) {
+ termAtt.setEmpty();
+ termAtt.append(term);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
new file mode 100644
index 0000000..44df006
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ *
+ * Used in combining a large number of text files into one text input reader
+ * along with the WholeFileRecordReader class.
+ *
+ */
+public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> {
+
+ @Override
+ public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ return new CombineFileRecordReader<>((CombineFileSplit) inputSplit,
+ taskAttemptContext, WholeFileRecordReader.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
new file mode 100644
index 0000000..37ebc44
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.utils.io.ChunkedWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Default parser for parsing text into sequence files.
+ */
+public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter {
+
+ public PrefixAdditionFilter(Configuration conf,
+ String keyPrefix,
+ Map<String, String> options,
+ ChunkedWriter writer,
+ Charset charset,
+ FileSystem fs) {
+ super(conf, keyPrefix, options, writer, charset, fs);
+ }
+
+ @Override
+ protected void process(FileStatus fst, Path current) throws IOException {
+ FileSystem fs = getFs();
+ ChunkedWriter writer = getWriter();
+ if (fst.isDir()) {
+ String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + Path.SEPARATOR + fst.getPath().getName();
+ fs.listStatus(fst.getPath(),
+ new PrefixAdditionFilter(getConf(), dirPath, getOptions(), writer, getCharset(), fs));
+ } else {
+ try (InputStream in = fs.open(fst.getPath())){
+ StringBuilder file = new StringBuilder();
+ for (String aFit : new FileLineIterable(in, getCharset(), false)) {
+ file.append(aFit).append('\n');
+ }
+ String name = current.getName().equals(fst.getPath().getName())
+ ? current.getName()
+ : current.getName() + Path.SEPARATOR + fst.getPath().getName();
+ writer.write(getPrefix() + Path.SEPARATOR + name, file.toString());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
new file mode 100644
index 0000000..311ab8d
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.utils.io.ChunkedWriter;
+
+/**
+ * Converts a directory of text documents into SequenceFiles of Specified chunkSize. This class takes in a
+ * parent directory containing sub folders of text documents and recursively reads the files and creates the
+ * {@link org.apache.hadoop.io.SequenceFile}s of docid => content. The docid is set as the relative path of the
+ * document from the parent directory prepended with a specified prefix. You can also specify the input encoding
+ * of the text files. The content of the output SequenceFiles are encoded as UTF-8 text.
+ */
+public class SequenceFilesFromDirectory extends AbstractJob {
+
+ private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
+
+ private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
+ public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
+ private static final String[] CHARSET_OPTION = {"charset", "c"};
+
+ private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
+
+ public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
+ public static final String BASE_INPUT_PATH = "baseinputpath";
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SequenceFilesFromDirectory(), args);
+ }
+
+ /*
+ * callback main after processing MapReduce parameters
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ addOptions();
+ addOption(DefaultOptionCreator.methodOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+
+ if (parseArguments(args) == null) {
+ return -1;
+ }
+
+ Map<String, String> options = parseOptions();
+ Path output = getOutputPath();
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(getConf(), output);
+ }
+
+ if (getOption(DefaultOptionCreator.METHOD_OPTION,
+ DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+ runSequential(getConf(), getInputPath(), output, options);
+ } else {
+ runMapReduce(getInputPath(), output);
+ }
+
+ return 0;
+ }
+
+ private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options)
+ throws IOException, InterruptedException, NoSuchMethodException {
+ // Running sequentially
+ Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+ String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+ FileSystem fs = FileSystem.get(input.toUri(), conf);
+
+ try (ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output)) {
+ SequenceFilesFromDirectoryFilter pathFilter;
+ String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
+ if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+ pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
+ } else {
+ pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
+ new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
+ new Object[] {conf, keyPrefix, options, writer, charset, fs});
+ }
+ fs.listStatus(input, pathFilter);
+ }
+ return 0;
+ }
+
+ private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException {
+
+ int chunkSizeInMB = 64;
+ if (hasOption(CHUNK_SIZE_OPTION[0])) {
+ chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+ }
+
+ String keyPrefix = null;
+ if (hasOption(KEY_PREFIX_OPTION[0])) {
+ keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+ }
+
+ String fileFilterClassName = null;
+ if (hasOption(FILE_FILTER_CLASS_OPTION[0])) {
+ fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
+ }
+
+ PathFilter pathFilter = null;
+ // Prefix Addition is presently handled in the Mapper and unlike runsequential()
+ // need not be done via a pathFilter
+ if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+ try {
+ pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ // Prepare Job for submission.
+ Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
+ SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
+ SequenceFileOutputFormat.class, "SequenceFilesFromDirectory");
+
+ Configuration jobConfig = job.getConfiguration();
+ jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix);
+ jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName);
+
+ FileSystem fs = FileSystem.get(jobConfig);
+ FileStatus fsFileStatus = fs.getFileStatus(input);
+
+ String inputDirList;
+ if (pathFilter != null) {
+ inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter);
+ } else {
+ inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+ }
+
+ jobConfig.set(BASE_INPUT_PATH, input.toString());
+
+ long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+
+ // set the max split locations, otherwise we get nasty debug stuff
+ jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
+
+ FileInputFormat.setInputPaths(job, inputDirList);
+ // need to set this to a multiple of the block size, or no split happens
+ FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+ FileOutputFormat.setCompressOutput(job, true);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ return 0;
+ }
+
+ /**
+ * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job.
+ * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available.
+ */
+ protected void addOptions() {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption(DefaultOptionCreator.methodOption().create());
+ addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
+ addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
+ "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+ addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
+ addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+ "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+ }
+
+ /**
+ * Override this method in order to parse your additional options from the command line. Do not forget to call
+ * super() otherwise standard options (input/output dirs etc) will not be available.
+ *
+ * @return Map of options
+ */
+ protected Map<String, String> parseOptions() {
+ Map<String, String> options = new HashMap<>();
+ options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0]));
+ options.put(FILE_FILTER_CLASS_OPTION[0], getOption(FILE_FILTER_CLASS_OPTION[0]));
+ options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0]));
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
new file mode 100644
index 0000000..6e4bd64
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.mahout.utils.io.ChunkedWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Implement this interface if you wish to extend SequenceFilesFromDirectory with your own parsing logic.
+ */
+public abstract class SequenceFilesFromDirectoryFilter implements PathFilter {
+ private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class);
+
+ private final String prefix;
+ private final ChunkedWriter writer;
+ private final Charset charset;
+ private final FileSystem fs;
+ private final Map<String, String> options;
+ private final Configuration conf;
+
+ protected SequenceFilesFromDirectoryFilter(Configuration conf,
+ String keyPrefix,
+ Map<String, String> options,
+ ChunkedWriter writer,
+ Charset charset,
+ FileSystem fs) {
+ this.prefix = keyPrefix;
+ this.writer = writer;
+ this.charset = charset;
+ this.fs = fs;
+ this.options = options;
+ this.conf = conf;
+ }
+
+ protected final String getPrefix() {
+ return prefix;
+ }
+
+ protected final ChunkedWriter getWriter() {
+ return writer;
+ }
+
+ protected final Charset getCharset() {
+ return charset;
+ }
+
+ protected final FileSystem getFs() {
+ return fs;
+ }
+
+ protected final Map<String, String> getOptions() {
+ return options;
+ }
+
+ protected final Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public final boolean accept(Path current) {
+ log.debug("CURRENT: {}", current.getName());
+ try {
+ for (FileStatus fst : fs.listStatus(current)) {
+ log.debug("CHILD: {}", fst.getPath().getName());
+ process(fst, current);
+ }
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+ return false;
+ }
+
+ protected abstract void process(FileStatus in, Path current) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
new file mode 100644
index 0000000..40df3c2
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.HadoopUtil;
+
+import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION;
+
+/**
+ * Map class for SequenceFilesFromDirectory MR job
+ */
+public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+
+ private String keyPrefix;
+ private Text fileValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], "");
+ }
+
+ public void map(IntWritable key, BytesWritable value, Context context)
+ throws IOException, InterruptedException {
+
+ Configuration configuration = context.getConfiguration();
+ Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+ String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
+
+ String filename = this.keyPrefix.length() > 0 ?
+ this.keyPrefix + Path.SEPARATOR + relativeFilePath :
+ Path.SEPARATOR + relativeFilePath;
+
+ fileValue.set(value.getBytes(), 0, value.getBytes().length);
+ context.write(new Text(filename), fileValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
new file mode 100644
index 0000000..c17cc12
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.commons.io.DirectoryWalker;
+import org.apache.commons.io.comparator.CompositeFileComparator;
+import org.apache.commons.io.comparator.DirectoryFileComparator;
+import org.apache.commons.io.comparator.PathFileComparator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.utils.email.MailOptions;
+import org.apache.mahout.utils.email.MailProcessor;
+import org.apache.mahout.utils.io.ChunkedWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Converts a directory of gzipped mail archives into SequenceFiles of specified
+ * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except
+ * it uses block-compressed {@link org.apache.hadoop.io.SequenceFile}s and parses out the subject and
+ * body text of each mail message into a separate key/value pair.
+ */
+public final class SequenceFilesFromMailArchives extends AbstractJob {
+
+ private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class);
+
+ public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
+ public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
+ public static final String[] CHARSET_OPTION = {"charset", "c"};
+ public static final String[] SUBJECT_OPTION = {"subject", "s"};
+ public static final String[] TO_OPTION = {"to", "to"};
+ public static final String[] FROM_OPTION = {"from", "from"};
+ public static final String[] REFERENCES_OPTION = {"references", "refs"};
+ public static final String[] BODY_OPTION = {"body", "b"};
+ public static final String[] STRIP_QUOTED_OPTION = {"stripQuoted", "q"};
+ public static final String[] QUOTED_REGEX_OPTION = {"quotedRegex", "regex"};
+ public static final String[] SEPARATOR_OPTION = {"separator", "sep"};
+ public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"};
+ public static final String BASE_INPUT_PATH = "baseinputpath";
+
+ private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
+
+ public void createSequenceFiles(MailOptions options) throws IOException {
+ try (ChunkedWriter writer =
+ new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()))){
+ MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer);
+ if (options.getInput().isDirectory()) {
+ PrefixAdditionDirectoryWalker walker = new PrefixAdditionDirectoryWalker(processor, writer);
+ walker.walk(options.getInput());
+ log.info("Parsed {} messages from {}", walker.getMessageCount(), options.getInput().getAbsolutePath());
+ } else {
+ long start = System.currentTimeMillis();
+ long cnt = processor.parseMboxLineByLine(options.getInput());
+ long finish = System.currentTimeMillis();
+ log.info("Parsed {} messages from {} in time: {}", cnt, options.getInput().getAbsolutePath(), finish - start);
+ }
+ }
+ }
+
+ private static class PrefixAdditionDirectoryWalker extends DirectoryWalker<Object> {
+
+ @SuppressWarnings("unchecked")
+ private static final Comparator<File> FILE_COMPARATOR = new CompositeFileComparator(
+ DirectoryFileComparator.DIRECTORY_REVERSE, PathFileComparator.PATH_COMPARATOR);
+
+ private final Deque<MailProcessor> processors = new ArrayDeque<>();
+ private final ChunkedWriter writer;
+ private final Deque<Long> messageCounts = new ArrayDeque<>();
+
+ public PrefixAdditionDirectoryWalker(MailProcessor processor, ChunkedWriter writer) {
+ processors.addFirst(processor);
+ this.writer = writer;
+ messageCounts.addFirst(0L);
+ }
+
+ public void walk(File startDirectory) throws IOException {
+ super.walk(startDirectory, null);
+ }
+
+ public long getMessageCount() {
+ return messageCounts.getFirst();
+ }
+
+ @Override
+ protected void handleDirectoryStart(File current, int depth, Collection<Object> results) throws IOException {
+ if (depth > 0) {
+ log.info("At {}", current.getAbsolutePath());
+ MailProcessor processor = processors.getFirst();
+ MailProcessor subDirProcessor = new MailProcessor(processor.getOptions(), processor.getPrefix()
+ + File.separator + current.getName(), writer);
+ processors.push(subDirProcessor);
+ messageCounts.push(0L);
+ }
+ }
+
+ @Override
+ protected File[] filterDirectoryContents(File directory, int depth, File[] files) throws IOException {
+ Arrays.sort(files, FILE_COMPARATOR);
+ return files;
+ }
+
+ @Override
+ protected void handleFile(File current, int depth, Collection<Object> results) throws IOException {
+ MailProcessor processor = processors.getFirst();
+ long currentDirMessageCount = messageCounts.pop();
+ try {
+ currentDirMessageCount += processor.parseMboxLineByLine(current);
+ } catch (IOException e) {
+ throw new IllegalStateException("Error processing " + current, e);
+ }
+ messageCounts.push(currentDirMessageCount);
+ }
+
+ @Override
+ protected void handleDirectoryEnd(File current, int depth, Collection<Object> results) throws IOException {
+ if (depth > 0) {
+ final long currentDirMessageCount = messageCounts.pop();
+ log.info("Parsed {} messages from directory {}", currentDirMessageCount, current.getAbsolutePath());
+
+ processors.pop();
+
+ // aggregate message counts
+ long parentDirMessageCount = messageCounts.pop();
+ parentDirMessageCount += currentDirMessageCount;
+ messageCounts.push(parentDirMessageCount);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new Configuration(), new SequenceFilesFromMailArchives(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.methodOption().create());
+
+ addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
+ addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
+ addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+ "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+ addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text. Default is false");
+ addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text. Default is false");
+ addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text. Default is false");
+ addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1],
+ "Include the references field in the text. Default is false");
+ addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output. Default is false");
+ addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1],
+ "Strip (remove) quoted email text in the body. Default is false");
+ addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1],
+ "Specify the regex that identifies quoted text. "
+ + "Default is to look for > or | at the beginning of the line.");
+ addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1],
+ "The separator to use between metadata items (to, from, etc.). Default is \\n", "\n");
+ addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1],
+ "The separator to use between lines in the body. Default is \\n. "
+ + "Useful to change if you wish to have the message be on one line", "\n");
+
+ addOption(DefaultOptionCreator.helpOption());
+ Map<String, List<String>> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+ File input = getInputFile();
+ String outputDir = getOutputPath().toString();
+
+ int chunkSize = 64;
+ if (hasOption(CHUNK_SIZE_OPTION[0])) {
+ chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+ }
+
+ String prefix = "";
+ if (hasOption(KEY_PREFIX_OPTION[0])) {
+ prefix = getOption(KEY_PREFIX_OPTION[0]);
+ }
+
+ Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+ MailOptions options = new MailOptions();
+ options.setInput(input);
+ options.setOutputDir(outputDir);
+ options.setPrefix(prefix);
+ options.setChunkSize(chunkSize);
+ options.setCharset(charset);
+
+ List<Pattern> patterns = new ArrayList<>(5);
+ // patternOrder is used downstream so that we can know what order the text
+ // is in instead of encoding it in the string, which
+ // would require more processing later to remove it pre feature selection.
+ Map<String, Integer> patternOrder = new HashMap<>();
+ int order = 0;
+ if (hasOption(FROM_OPTION[0])) {
+ patterns.add(MailProcessor.FROM_PREFIX);
+ patternOrder.put(MailOptions.FROM, order++);
+ }
+ if (hasOption(TO_OPTION[0])) {
+ patterns.add(MailProcessor.TO_PREFIX);
+ patternOrder.put(MailOptions.TO, order++);
+ }
+ if (hasOption(REFERENCES_OPTION[0])) {
+ patterns.add(MailProcessor.REFS_PREFIX);
+ patternOrder.put(MailOptions.REFS, order++);
+ }
+ if (hasOption(SUBJECT_OPTION[0])) {
+ patterns.add(MailProcessor.SUBJECT_PREFIX);
+ patternOrder.put(MailOptions.SUBJECT, order += 1);
+ }
+ options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0]));
+
+ options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
+ options.setPatternOrder(patternOrder);
+ options.setIncludeBody(hasOption(BODY_OPTION[0]));
+
+ if (hasOption(SEPARATOR_OPTION[0])) {
+ options.setSeparator(getOption(SEPARATOR_OPTION[0]));
+ } else {
+ options.setSeparator("\n");
+ }
+
+ if (hasOption(BODY_SEPARATOR_OPTION[0])) {
+ options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0]));
+ }
+
+ if (hasOption(QUOTED_REGEX_OPTION[0])) {
+ options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])));
+ }
+
+ if (getOption(DefaultOptionCreator.METHOD_OPTION,
+ DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+ runSequential(options);
+ } else {
+ runMapReduce(getInputPath(), getOutputPath());
+ }
+
+ return 0;
+ }
+
+ private int runSequential(MailOptions options)
+ throws IOException, InterruptedException, NoSuchMethodException {
+
+ long start = System.currentTimeMillis();
+ createSequenceFiles(options);
+ long finish = System.currentTimeMillis();
+ log.info("Conversion took {}ms", finish - start);
+
+ return 0;
+ }
+
+ private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
+
+ Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class,
+ Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives");
+
+ Configuration jobConfig = job.getConfiguration();
+
+ if (hasOption(KEY_PREFIX_OPTION[0])) {
+ jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0]));
+ }
+
+ int chunkSize = 0;
+ if (hasOption(CHUNK_SIZE_OPTION[0])) {
+ chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+ jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize));
+ }
+
+ Charset charset;
+ if (hasOption(CHARSET_OPTION[0])) {
+ charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+ jobConfig.set(CHARSET_OPTION[0], charset.displayName());
+ }
+
+ if (hasOption(FROM_OPTION[0])) {
+ jobConfig.set(FROM_OPTION[1], "true");
+ }
+
+ if (hasOption(TO_OPTION[0])) {
+ jobConfig.set(TO_OPTION[1], "true");
+ }
+
+ if (hasOption(REFERENCES_OPTION[0])) {
+ jobConfig.set(REFERENCES_OPTION[1], "true");
+ }
+
+ if (hasOption(SUBJECT_OPTION[0])) {
+ jobConfig.set(SUBJECT_OPTION[1], "true");
+ }
+
+ if (hasOption(QUOTED_REGEX_OPTION[0])) {
+ jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString());
+ }
+
+ if (hasOption(SEPARATOR_OPTION[0])) {
+ jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0]));
+ } else {
+ jobConfig.set(SEPARATOR_OPTION[1], "\n");
+ }
+
+ if (hasOption(BODY_OPTION[0])) {
+ jobConfig.set(BODY_OPTION[1], "true");
+ } else {
+ jobConfig.set(BODY_OPTION[1], "false");
+ }
+
+ if (hasOption(BODY_SEPARATOR_OPTION[0])) {
+ jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0]));
+ } else {
+ jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n");
+ }
+
+ FileSystem fs = FileSystem.get(jobConfig);
+ FileStatus fsFileStatus = fs.getFileStatus(inputPath);
+
+ jobConfig.set(BASE_INPUT_PATH, inputPath.toString());
+ String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+ FileInputFormat.setInputPaths(job, inputDirList);
+
+ long chunkSizeInBytes = chunkSize * 1024 * 1024;
+ // need to set this to a multiple of the block size, or no split happens
+ FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+
+ // set the max split locations, otherwise we get nasty debug stuff
+ jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ return 0;
+ }
+}