You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:26 UTC

[24/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
new file mode 100644
index 0000000..19f78b5
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * Supplies some useful and repeatedly-used instances of {@link PathFilter}.
+ */
+public final class PathFilters {
+
+  private static final PathFilter PART_FILE_INSTANCE = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return name.startsWith("part-") && !name.endsWith(".crc");
+    }
+  };
+  
+  /**
+   * Pathfilter to read the final clustering file.
+   */
+  private static final PathFilter CLUSTER_FINAL = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return name.startsWith("clusters-") && name.endsWith("-final");
+    }
+  };
+
+  private static final PathFilter LOGS_CRC_INSTANCE = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return !(name.endsWith(".crc") || name.startsWith(".") || name.startsWith("_"));
+    }
+  };
+
+  private PathFilters() {
+  }
+
+  /**
+   * @return {@link PathFilter} that accepts paths whose file name starts with "part-". Excludes
+   * ".crc" files.
+   */
+  public static PathFilter partFilter() {
+    return PART_FILE_INSTANCE;
+  }
+  
+  /**
+   * @return {@link PathFilter} that accepts paths whose file name starts with "part-" and ends with "-final".
+   */
+  public static PathFilter finalPartFilter() {
+    return CLUSTER_FINAL;
+  }
+
+  /**
+   * @return {@link PathFilter} that rejects paths whose file name starts with "_" (e.g. Cloudera
+   * _SUCCESS files or Hadoop _logs), or "." (e.g. local hidden files), or ends with ".crc"
+   */
+  public static PathFilter logsCRCFilter() {
+    return LOGS_CRC_INSTANCE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
new file mode 100644
index 0000000..7ea713e
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+/**
+ * Used by {@link SequenceFileDirIterable} and the like to select whether the input path specifies a
+ * directory to list, or a glob pattern.
+ */
+public enum PathType {
+  GLOB,
+  LIST,
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
new file mode 100644
index 0000000..ca4d6b8
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirIterator}.</p>
+ */
+public final class SequenceFileDirIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+  private final Path path;
+  private final PathType pathType;
+  private final PathFilter filter;
+  private final Comparator<FileStatus> ordering;
+  private final boolean reuseKeyValueInstances;
+  private final Configuration conf;
+
+  public SequenceFileDirIterable(Path path, PathType pathType, Configuration conf) {
+    this(path, pathType, null, conf);
+  }
+
+  public SequenceFileDirIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+    this(path, pathType, filter, null, false, conf);
+  }
+
+  /**
+   * @param path file to iterate over
+   * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+   *  glob pattern ({@link PathType#GLOB})
+   * @param filter if not null, specifies sequence files to be ignored by the iteration
+   * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+   * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+   *  one for each read from the file
+   */
+  public SequenceFileDirIterable(Path path,
+                                 PathType pathType,
+                                 PathFilter filter,
+                                 Comparator<FileStatus> ordering,
+                                 boolean reuseKeyValueInstances,
+                                 Configuration conf) {
+    this.path = path;
+    this.pathType = pathType;
+    this.filter = filter;
+    this.ordering = ordering;
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterator<Pair<K,V>> iterator() {
+    try {
+      return new SequenceFileDirIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(path.toString(), ioe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
new file mode 100644
index 0000000..cf6a871
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+
+/**
+ * Like {@link SequenceFileIterator}, but iterates not just over one sequence file, but many. The input path
+ * may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirIterator<K extends Writable,V extends Writable>
+    extends ForwardingIterator<Pair<K,V>> implements Closeable {
+
+  private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+  private Iterator<Pair<K,V>> delegate;
+  private final List<SequenceFileIterator<K,V>> iterators;
+
+  /**
+   * Multifile sequence file iterator where files are specified explicitly by
+   * path parameters.
+   */
+  public SequenceFileDirIterator(Path[] path,
+                                 boolean reuseKeyValueInstances,
+                                 Configuration conf) throws IOException {
+
+    iterators = Lists.newArrayList();
+    // we assume all files should exist, otherwise we will bail out.
+    FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+    FileStatus[] statuses = new FileStatus[path.length];
+    for (int i = 0; i < statuses.length; i++) {
+      statuses[i] = fs.getFileStatus(path[i]);
+    }
+    init(statuses, reuseKeyValueInstances, conf);
+  }
+
+  /**
+   * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+   * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+   * (depending on pathType parameter).
+   */
+  public SequenceFileDirIterator(Path path,
+                                 PathType pathType,
+                                 PathFilter filter,
+                                 Comparator<FileStatus> ordering,
+                                 boolean reuseKeyValueInstances,
+                                 Configuration conf) throws IOException {
+
+    FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
+    iterators = Lists.newArrayList();
+    init(statuses, reuseKeyValueInstances, conf);
+  }
+
+  private void init(FileStatus[] statuses,
+                    final boolean reuseKeyValueInstances,
+                    final Configuration conf) {
+
+    /*
+     * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+     * was qualified. In this case, which is a corner case, we should assume an
+     * empty iterator, not an NPE.
+     */
+    if (statuses == null) {
+      statuses = NO_STATUSES;
+    }
+
+    Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+    Iterator<Iterator<Pair<K, V>>> fsIterators =
+      Iterators.transform(fileStatusIterator,
+        new Function<FileStatus, Iterator<Pair<K, V>>>() {
+          @Override
+          public Iterator<Pair<K, V>> apply(FileStatus from) {
+            try {
+              SequenceFileIterator<K, V> iterator = new SequenceFileIterator<>(from.getPath(),
+                  reuseKeyValueInstances, conf);
+              iterators.add(iterator);
+              return iterator;
+            } catch (IOException ioe) {
+              throw new IllegalStateException(from.getPath().toString(), ioe);
+            }
+          }
+        });
+
+    Collections.reverse(iterators); // close later in reverse order
+
+    delegate = Iterators.concat(fsIterators);
+  }
+
+  @Override
+  protected Iterator<Pair<K,V>> delegate() {
+    return delegate;
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.close(iterators);
+    iterators.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
new file mode 100644
index 0000000..1cb4ebc
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirValueIterator}.</p>
+ */
+public final class SequenceFileDirValueIterable<V extends Writable> implements Iterable<V> {
+
+  private final Path path;
+  private final PathType pathType;
+  private final PathFilter filter;
+  private final Comparator<FileStatus> ordering;
+  private final boolean reuseKeyValueInstances;
+  private final Configuration conf;
+
+  public SequenceFileDirValueIterable(Path path, PathType pathType, Configuration conf) {
+    this(path, pathType, null, conf);
+  }
+
+  public SequenceFileDirValueIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+    this(path, pathType, filter, null, false, conf);
+  }
+
+  /**
+   * @param path file to iterate over
+   * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+   *  glob pattern ({@link PathType#GLOB})
+   * @param filter if not null, specifies sequence files to be ignored by the iteration
+   * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+   * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+   *  one for each read from the file
+   */
+  public SequenceFileDirValueIterable(Path path,
+                                      PathType pathType,
+                                      PathFilter filter,
+                                      Comparator<FileStatus> ordering,
+                                      boolean reuseKeyValueInstances,
+                                      Configuration conf) {
+    this.path = path;
+    this.pathType = pathType;
+    this.filter = filter;
+    this.ordering = ordering;
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterator<V> iterator() {
+    try {
+      return new SequenceFileDirValueIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(path.toString(), ioe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
new file mode 100644
index 0000000..908c8bb
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+
+/**
+ * Like {@link SequenceFileValueIterator}, but iterates not just over one
+ * sequence file, but many. The input path may be specified as a directory of
+ * files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirValueIterator<V extends Writable> extends
+    ForwardingIterator<V> implements Closeable {
+  
+  private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+  private Iterator<V> delegate;
+  private final List<SequenceFileValueIterator<V>> iterators;
+
+  /**
+   * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+   * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+   * (depending on pathType parameter).
+   */
+  public SequenceFileDirValueIterator(Path path,
+                                      PathType pathType,
+                                      PathFilter filter,
+                                      Comparator<FileStatus> ordering,
+                                      boolean reuseKeyValueInstances,
+                                      Configuration conf) throws IOException {
+    FileStatus[] statuses;
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    if (filter == null) {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
+    } else {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
+    }
+    iterators = Lists.newArrayList();
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
+
+  /**
+   * Multifile sequence file iterator where files are specified explicitly by
+   * path parameters.
+   */
+  public SequenceFileDirValueIterator(Path[] path,
+                                      Comparator<FileStatus> ordering,
+                                      boolean reuseKeyValueInstances,
+                                      Configuration conf) throws IOException {
+
+    iterators = Lists.newArrayList();
+    /*
+     * we assume all files should exist, otherwise we will bail out.
+     */
+    FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+    FileStatus[] statuses = new FileStatus[path.length];
+    for (int i = 0; i < statuses.length; i++) {
+      statuses[i] = fs.getFileStatus(path[i]);
+    }
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
+
+  private void init(FileStatus[] statuses,
+                    Comparator<FileStatus> ordering,
+                    final boolean reuseKeyValueInstances,
+                    final Configuration conf) throws IOException {
+
+    /*
+     * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+     * was qualified. In this case, which is a corner case, we should assume an
+     * empty iterator, not an NPE.
+     */
+    if (statuses == null) {
+      statuses = NO_STATUSES;
+    }
+
+    if (ordering != null) {
+      Arrays.sort(statuses, ordering);
+    }
+    Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+    try {
+
+      Iterator<Iterator<V>> fsIterators =
+        Iterators.transform(fileStatusIterator,
+          new Function<FileStatus, Iterator<V>>() {
+            @Override
+            public Iterator<V> apply(FileStatus from) {
+              try {
+                SequenceFileValueIterator<V> iterator = new SequenceFileValueIterator<>(from.getPath(),
+                    reuseKeyValueInstances, conf);
+                iterators.add(iterator);
+                return iterator;
+              } catch (IOException ioe) {
+                throw new IllegalStateException(from.getPath().toString(), ioe);
+              }
+            }
+          });
+
+      Collections.reverse(iterators); // close later in reverse order
+
+      delegate = Iterators.concat(fsIterators);
+
+    } finally {
+      /*
+       * prevent file handle leaks in case one of handles fails to open. If some
+       * of the files fail to open, constructor will fail and close() will never
+       * be called. Thus, those handles that did open in constructor, would leak
+       * out, unless we specifically handle it here.
+       */
+      IOUtils.close(iterators);
+    }
+  }
+
+  @Override
+  protected Iterator<V> delegate() {
+    return delegate;
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.close(iterators);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
new file mode 100644
index 0000000..f17c2a1
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileIterator}.</p>
+ */
+public final class SequenceFileIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+  private final Path path;
+  private final boolean reuseKeyValueInstances;
+  private final Configuration conf;
+
+  /**
+   * Like {@link #SequenceFileIterable(Path, boolean, Configuration)} but key and value instances are not reused
+   * by default.
+   *
+   * @param path file to iterate over
+   */
+  public SequenceFileIterable(Path path, Configuration conf) {
+    this(path, false, conf);
+  }
+
+  /**
+   * @param path file to iterate over
+   * @param reuseKeyValueInstances if true, reuses instances of the key and value object instead of creating a new
+   *  one for each read from the file
+   */
+  public SequenceFileIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) {
+    this.path = path;
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterator<Pair<K, V>> iterator() {
+    try {
+      return new SequenceFileIterator<>(path, reuseKeyValueInstances, conf);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(path.toString(), ioe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
new file mode 100644
index 0000000..bc5c549
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s keys and values, as a {@link Pair}
+ * containing key and value.</p>
+ */
+public final class SequenceFileIterator<K extends Writable,V extends Writable>
+  extends AbstractIterator<Pair<K,V>> implements Closeable {
+
+  private final SequenceFile.Reader reader;
+  private final Configuration conf;
+  private final Class<K> keyClass;
+  private final Class<V> valueClass;
+  private final boolean noValue;
+  private K key;
+  private V value;
+  private final boolean reuseKeyValueInstances;
+
+  private static final Logger log = LoggerFactory.getLogger(SequenceFileIterator.class);
+
+      /**
+       * @throws IOException if path can't be read, or its key or value class can't be instantiated
+       */
+
+  public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
+    key = null;
+    value = null;
+    FileSystem fs = path.getFileSystem(conf);
+    path = path.makeQualified(fs);
+    reader = new SequenceFile.Reader(fs, path, conf);
+    this.conf = conf;
+    keyClass = (Class<K>) reader.getKeyClass();
+    valueClass = (Class<V>) reader.getValueClass();
+    noValue = NullWritable.class.equals(valueClass);
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+  }
+
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  public Class<V> getValueClass() {
+    return valueClass;
+  }
+
+  @Override
+  public void close() throws IOException {
+    key = null;
+    value = null;
+    Closeables.close(reader, true);
+
+    endOfData();
+  }
+
+  @Override
+  protected Pair<K,V> computeNext() {
+    if (!reuseKeyValueInstances || value == null) {
+      key = ReflectionUtils.newInstance(keyClass, conf);
+      if (!noValue) {
+        value = ReflectionUtils.newInstance(valueClass, conf);
+      }
+    }
+    try {
+      boolean available;
+      if (noValue) {
+        available = reader.next(key);
+      } else {
+        available = reader.next(key, value);
+      }
+      if (!available) {
+        close();
+        return null;
+      }
+      return new Pair<>(key, value);
+    } catch (IOException ioe) {
+      try {
+        close();
+      } catch (IOException e) {
+        log.error(e.getMessage(), e);
+      }
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
new file mode 100644
index 0000000..d2fdf8d
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileValueIterator}.</p>
+ */
+public final class SequenceFileValueIterable<V extends Writable> implements Iterable<V> {
+
+  private final Path path;
+  private final boolean reuseKeyValueInstances;
+  private final Configuration conf;
+
+  /**
+   * Like {@link #SequenceFileValueIterable(Path, boolean, Configuration)} but instances are not reused
+   * by default.
+   *
+   * @param path file to iterate over
+   */
+  public SequenceFileValueIterable(Path path, Configuration conf) {
+    this(path, false, conf);
+  }
+
+  /**
+   * @param path file to iterate over
+   * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+   *  one for each read from the file
+   */
+  public SequenceFileValueIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) {
+    this.path = path;
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+    this.conf = conf;
+  }
+
+  @Override
+  public Iterator<V> iterator() {
+    try {
+      return new SequenceFileValueIterator<>(path, reuseKeyValueInstances, conf);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(path.toString(), ioe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
new file mode 100644
index 0000000..cb2295c
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s values only.</p>
+ */
+public final class SequenceFileValueIterator<V extends Writable> extends AbstractIterator<V> implements Closeable {
+
+  private final SequenceFile.Reader reader;
+  private final Configuration conf;
+  private final Class<V> valueClass;
+  private final Writable key;
+  private V value;
+  private final boolean reuseKeyValueInstances;
+
+  private static final Logger log = LoggerFactory.getLogger(SequenceFileValueIterator.class);
+
+      /**
+       * @throws IOException if path can't be read, or its key or value class can't be instantiated
+       */
+
+  public SequenceFileValueIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
+    value = null;
+    FileSystem fs = path.getFileSystem(conf);
+    path = fs.makeQualified(path);
+    reader = new SequenceFile.Reader(fs, path, conf);
+    this.conf = conf;
+    Class<? extends Writable> keyClass = (Class<? extends Writable>) reader.getKeyClass();
+    key = ReflectionUtils.newInstance(keyClass, conf);
+    valueClass = (Class<V>) reader.getValueClass();
+    this.reuseKeyValueInstances = reuseKeyValueInstances;
+  }
+
+  public Class<V> getValueClass() {
+    return valueClass;
+  }
+
+  @Override
+  public void close() throws IOException {
+    value = null;
+    Closeables.close(reader, true);
+    endOfData();
+  }
+
+  @Override
+  protected V computeNext() {
+    if (!reuseKeyValueInstances || value == null) {
+      value = ReflectionUtils.newInstance(valueClass, conf);
+    }
+    try {
+      boolean available = reader.next(key, value);
+      if (!available) {
+        close();
+        return null;
+      }
+      return value;
+    } catch (IOException ioe) {
+      try {
+        close();
+      } catch (IOException e) {
+        log.error(e.getMessage(), e);
+      }
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
new file mode 100644
index 0000000..742d6cf
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
@@ -0,0 +1,61 @@
+package org.apache.mahout.common.lucene;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.ClassUtils;
+
+public final class AnalyzerUtils {
+
+  private AnalyzerUtils() {}
+
+  /**
+   * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}.  Note, if you need to pass in
+   * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments
+   * @param analyzerClassName - Lucene Analyzer Name
+   * @return {@link Analyzer}
+   * @throws ClassNotFoundException - {@link ClassNotFoundException}
+   */
+  public static Analyzer createAnalyzer(String analyzerClassName) throws ClassNotFoundException {
+    return createAnalyzer(analyzerClassName, Version.LUCENE_5_5_2);
+  }
+
+  public static Analyzer createAnalyzer(String analyzerClassName, Version version) throws ClassNotFoundException {
+    Class<? extends Analyzer> analyzerClass = Class.forName(analyzerClassName).asSubclass(Analyzer.class);
+    return createAnalyzer(analyzerClass, version);
+  }
+
+  /**
+   * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}.  Note, if you need to pass in
+   * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments
+   * @param analyzerClass The Analyzer Class to instantiate
+   * @return {@link Analyzer}
+   */
+  public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass) {
+    return createAnalyzer(analyzerClass, Version.LUCENE_5_5_2);
+  }
+
+  public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass, Version version) {
+    try {
+      return ClassUtils.instantiateAs(analyzerClass, Analyzer.class,
+          new Class<?>[] { Version.class }, new Object[] { version });
+    } catch (IllegalStateException e) {
+      return ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
new file mode 100644
index 0000000..5facad8
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.lucene;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+
+import java.util.Iterator;
+
+/** Used to emit tokens from an input string array in the style of TokenStream */
+public final class IteratorTokenStream extends TokenStream {
+  private final CharTermAttribute termAtt;
+  private final Iterator<String> iterator;
+
+  public IteratorTokenStream(Iterator<String> iterator) {
+    this.iterator = iterator;
+    this.termAtt = addAttribute(CharTermAttribute.class);
+  }
+
+  @Override
+  public boolean incrementToken() {
+    if (iterator.hasNext()) {
+      clearAttributes();
+      termAtt.append(iterator.next());
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
new file mode 100644
index 0000000..af60d8b
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.lucene;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+
+import java.io.IOException;
+
+/**
+ * Provide an Iterator for the tokens in a TokenStream.
+ *
+ * Note, it is the responsibility of the instantiating class to properly consume the
+ * {@link org.apache.lucene.analysis.TokenStream}.  See the Lucene {@link org.apache.lucene.analysis.TokenStream}
+ * documentation for more information.
+ */
+//TODO: consider using the char/byte arrays instead of strings, esp. when we upgrade to Lucene 4.0
+public final class TokenStreamIterator extends AbstractIterator<String> {
+
+  private final TokenStream tokenStream;
+
+  public TokenStreamIterator(TokenStream tokenStream) {
+    this.tokenStream = tokenStream;
+  }
+
+  @Override
+  protected String computeNext() {
+    try {
+      if (tokenStream.incrementToken()) {
+        return tokenStream.getAttribute(CharTermAttribute.class).toString();
+      } else {
+        tokenStream.end();
+        tokenStream.close();
+        return endOfData();
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("IO error while tokenizing", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
new file mode 100644
index 0000000..8e0385d
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class MergeVectorsCombiner
+    extends Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+
+  @Override
+  public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx)
+    throws IOException, InterruptedException {
+    ctx.write(key, VectorWritable.merge(vectors.iterator()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
new file mode 100644
index 0000000..b8d5dea
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class MergeVectorsReducer extends
+    Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+
+  private final VectorWritable result = new VectorWritable();
+
+  @Override
+  public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx)
+    throws IOException, InterruptedException {
+    Vector merged = VectorWritable.merge(vectors.iterator()).get();
+    result.set(new SequentialAccessSparseVector(merged));
+    ctx.write(key, result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
new file mode 100644
index 0000000..c6c3f05
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class TransposeMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+  public static final String NEW_NUM_COLS_PARAM = TransposeMapper.class.getName() + ".newNumCols";
+
+  private int newNumCols;
+
+  @Override
+  protected void setup(Context ctx) throws IOException, InterruptedException {
+    newNumCols = ctx.getConfiguration().getInt(NEW_NUM_COLS_PARAM, Integer.MAX_VALUE);
+  }
+
+  @Override
+  protected void map(IntWritable r, VectorWritable v, Context ctx) throws IOException, InterruptedException {
+    int row = r.get();
+    for (Vector.Element e : v.get().nonZeroes()) {
+      RandomAccessSparseVector tmp = new RandomAccessSparseVector(newNumCols, 1);
+      tmp.setQuick(row, e.get());
+      r.set(e.index());
+      ctx.write(r, new VectorWritable(tmp));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
new file mode 100644
index 0000000..1d93386
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
+
+import java.io.IOException;
+
+public class VectorSumCombiner
+      extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
+    private final VectorWritable result = new VectorWritable();
+
+    @Override
+    protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
+      throws IOException, InterruptedException {
+      result.set(Vectors.sum(values.iterator()));
+      ctx.write(key, result);
+    }
+  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
new file mode 100644
index 0000000..97d3805
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
+
+import java.io.IOException;
+
+public class VectorSumReducer
+    extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
+  @Override
+  protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
+    throws IOException, InterruptedException {
+    ctx.write(key, new VectorWritable(Vectors.sum(values.iterator())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
new file mode 100644
index 0000000..7adadc1
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.nlp;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class NGrams {
+
+  private static final Splitter SPACE_TAB = Splitter.on(CharMatcher.anyOf(" \t"));
+  
+  private final String line;
+  private final int gramSize;
+  
+  public NGrams(String line, int gramSize) {
+    this.line = line;
+    this.gramSize = gramSize;
+  }
+  
+  public Map<String,List<String>> generateNGrams() {
+    Map<String,List<String>> returnDocument = Maps.newHashMap();
+    
+    Iterator<String> tokenizer = SPACE_TAB.split(line).iterator();
+    List<String> tokens = Lists.newArrayList();
+    String labelName = tokenizer.next();
+    List<String> previousN1Grams = Lists.newArrayList();
+    while (tokenizer.hasNext()) {
+      
+      String nextToken = tokenizer.next();
+      if (previousN1Grams.size() == gramSize) {
+        previousN1Grams.remove(0);
+      }
+      
+      previousN1Grams.add(nextToken);
+      
+      StringBuilder gramBuilder = new StringBuilder();
+      
+      for (String gram : previousN1Grams) {
+        gramBuilder.append(gram);
+        String token = gramBuilder.toString();
+        tokens.add(token);
+        gramBuilder.append(' ');
+      }
+    }
+    returnDocument.put(labelName, tokens);
+    return returnDocument;
+  }
+  
+  public List<String> generateNGramsWithoutLabel() {
+
+    List<String> tokens = Lists.newArrayList();
+    List<String> previousN1Grams = Lists.newArrayList();
+    for (String nextToken : SPACE_TAB.split(line)) {
+      
+      if (previousN1Grams.size() == gramSize) {
+        previousN1Grams.remove(0);
+      }
+      
+      previousN1Grams.add(nextToken);
+      
+      StringBuilder gramBuilder = new StringBuilder();
+      
+      for (String gram : previousN1Grams) {
+        gramBuilder.append(gram);
+        String token = gramBuilder.toString();
+        tokens.add(token);
+        gramBuilder.append(' ');
+      }
+    }
+    
+    return tokens;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
new file mode 100644
index 0000000..f0a7aa8
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+
+public abstract class AbstractParameter<T> implements Parameter<T> {
+  
+  private T value;
+  private final String prefix;
+  private final String name;
+  private final String description;
+  private final Class<T> type;
+  private final String defaultValue;
+
+  protected AbstractParameter(Class<T> type,
+                              String prefix,
+                              String name,
+                              Configuration jobConf,
+                              T defaultValue,
+                              String description) {
+    this.type = type;
+    this.name = name;
+    this.description = description;
+
+    this.value = defaultValue;
+    this.defaultValue = getStringValue();
+
+    this.prefix = prefix;
+    String jobConfValue = jobConf.get(prefix + name);
+    if (jobConfValue != null) {
+      setStringValue(jobConfValue);
+    }
+
+  }
+  
+  @Override
+  public void configure(Configuration jobConf) {
+  // nothing to do
+  }
+  
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) { }
+  
+  @Override
+  public String getStringValue() {
+    if (value == null) {
+      return null;
+    }
+    return value.toString();
+  }
+  
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public String prefix() {
+    return prefix;
+  }
+  
+  @Override
+  public String name() {
+    return name;
+  }
+  
+  @Override
+  public String description() {
+    return description;
+  }
+  
+  @Override
+  public Class<T> type() {
+    return type;
+  }
+  
+  @Override
+  public String defaultValue() {
+    return defaultValue;
+  }
+  
+  @Override
+  public T get() {
+    return value;
+  }
+  
+  @Override
+  public void set(T value) {
+    this.value = value;
+  }
+  
+  @Override
+  public String toString() {
+    if (value != null) {
+      return value.toString();
+    } else {
+      return super.toString();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
new file mode 100644
index 0000000..1d1c0bb
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class ClassParameter extends AbstractParameter<Class> {
+  
+  public ClassParameter(String prefix, String name, Configuration jobConf, Class<?> defaultValue, String description) {
+    super(Class.class, prefix, name, jobConf, defaultValue, description);
+  }
+  
+  @Override
+  public void setStringValue(String stringValue) {
+    try {
+      set(Class.forName(stringValue));
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  
+  @Override
+  public String getStringValue() {
+    if (get() == null) {
+      return null;
+    }
+    return get().getName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
new file mode 100644
index 0000000..cb3efcf
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class DoubleParameter extends AbstractParameter<Double> {
+  
+  public DoubleParameter(String prefix, String name, Configuration conf, double defaultValue, String description) {
+    super(Double.class, prefix, name, conf, defaultValue, description);
+  }
+  
+  @Override
+  public void setStringValue(String stringValue) {
+    set(Double.valueOf(stringValue));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
new file mode 100644
index 0000000..292fa27
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+/**
+ * An accessor to a parameters in the job.
+ * 
+ * This is a composite entity that can it self contain more parameters. Say the parameters describes what
+ * DistanceMeasure class to use, once set this parameters would also produce the parameters available in that
+ * DistanceMeasure implementation.
+ */
+public interface Parameter<T> extends Parametered {
+  /** @return job configuration setting key prefix, e.g. 'org.apache.mahout.util.WeightedDistanceMeasure.' */
+  String prefix();
+  
+  /** @return configuration parameters name, e.g. 'weightsFile' */
+  String name();
+  
+  /** @return human readable description of parameters */
+  String description();
+  
+  /** @return value class type */
+  Class<T> type();
+  
+  /**
+   * @param stringValue
+   *          value string representation
+   */
+  void setStringValue(String stringValue);
+  
+  /**
+   * @return value string representation of current value
+   */
+  String getStringValue();
+  
+  /**
+   * @param value
+   *          new parameters value
+   */
+  void set(T value);
+  
+  /** @return current parameters value */
+  T get();
+  
+  /** @return value used if not set by consumer */
+  String defaultValue();
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
new file mode 100644
index 0000000..96c9457
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Meta information and accessors for configuring a job. */
+public interface Parametered {
+  
+  Logger log = LoggerFactory.getLogger(Parametered.class);
+  
+  Collection<Parameter<?>> getParameters();
+  
+  /**
+   * EXPERT: consumers should never have to call this method. It would be friendly visible to
+   * {@link ParameteredGeneralizations} if java supported it. Calling this method should create a new list of
+   * parameters and is called
+   * 
+   * @param prefix
+   *          ends with a dot if not empty.
+   * @param jobConf
+   *          configuration used for retrieving values
+   * @see ParameteredGeneralizations#configureParameters(String,Parametered,Configuration)
+   *      invoking method
+   * @see ParameteredGeneralizations#configureParametersRecursively(Parametered,String,Configuration)
+   *      invoking method
+   */
+  void createParameters(String prefix, Configuration jobConf);
+  
+  void configure(Configuration config);
+  
+  /** "multiple inheritance" */
+  final class ParameteredGeneralizations {
+    private ParameteredGeneralizations() { }
+    
+    public static void configureParameters(Parametered parametered, Configuration jobConf) {
+      configureParameters(parametered.getClass().getSimpleName() + '.',
+        parametered, jobConf);
+      
+    }
+
+    /**
+     * Calls
+     * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)}
+     * on parameter parmetered, and then recur down its composite tree to invoke
+     * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)}
+     * and {@link Parametered#configure(org.apache.hadoop.conf.Configuration)} on
+     * each composite part.
+     * 
+     * @param prefix
+     *          ends with a dot if not empty.
+     * @param parametered
+     *          instance to be configured
+     * @param jobConf
+     *          configuration used for retrieving values
+     */
+    public static void configureParameters(String prefix, Parametered parametered, Configuration jobConf) {
+      parametered.createParameters(prefix, jobConf);
+      configureParametersRecursively(parametered, prefix, jobConf);
+    }
+    
+    private static void configureParametersRecursively(Parametered parametered, String prefix, Configuration jobConf) {
+      for (Parameter<?> parameter : parametered.getParameters()) {
+        if (log.isDebugEnabled()) {
+          log.debug("Configuring {}{}", prefix, parameter.name());
+        }
+        String name = prefix + parameter.name() + '.';
+        parameter.createParameters(name, jobConf);
+        parameter.configure(jobConf);
+        if (!parameter.getParameters().isEmpty()) {
+          configureParametersRecursively(parameter, name, jobConf);
+        }
+      }
+    }
+    
+    public static String help(Parametered parametered) {
+      return new Help(parametered).toString();
+    }
+    
+    public static String conf(Parametered parametered) {
+      return new Conf(parametered).toString();
+    }
+    
+    private static final class Help {
+      static final int NAME_DESC_DISTANCE = 8;
+
+      private final StringBuilder sb;
+      private int longestName;
+      private int numChars = 100; // a few extra just to be sure
+
+      private Help(Parametered parametered) {
+        recurseCount(parametered);
+        numChars += (longestName + NAME_DESC_DISTANCE) * parametered.getParameters().size();
+        sb = new StringBuilder(numChars);
+        recurseWrite(parametered);
+      }
+      
+      @Override
+      public String toString() {
+        return sb.toString();
+      }
+
+      private void recurseCount(Parametered parametered) {
+        for (Parameter<?> parameter : parametered.getParameters()) {
+          int parameterNameLength = parameter.name().length();
+          if (parameterNameLength > longestName) {
+            longestName = parameterNameLength;
+          }
+          recurseCount(parameter);
+          numChars += parameter.description().length();
+        }
+      }
+      
+      private void recurseWrite(Parametered parametered) {
+        for (Parameter<?> parameter : parametered.getParameters()) {
+          sb.append(parameter.prefix());
+          sb.append(parameter.name());
+          int max = longestName - parameter.name().length() - parameter.prefix().length()
+                    + NAME_DESC_DISTANCE;
+          for (int i = 0; i < max; i++) {
+            sb.append(' ');
+          }
+          sb.append(parameter.description());
+          if (parameter.defaultValue() != null) {
+            sb.append(" (default value '");
+            sb.append(parameter.defaultValue());
+            sb.append("')");
+          }
+          sb.append('\n');
+          recurseWrite(parameter);
+        }
+      }
+    }
+    
+    private static final class Conf {
+      private final StringBuilder sb;
+      private int longestName;
+      private int numChars = 100; // a few extra just to be sure
+
+      private Conf(Parametered parametered) {
+        recurseCount(parametered);
+        sb = new StringBuilder(numChars);
+        recurseWrite(parametered);
+      }
+      
+      @Override
+      public String toString() {
+        return sb.toString();
+      }
+
+      private void recurseCount(Parametered parametered) {
+        for (Parameter<?> parameter : parametered.getParameters()) {
+          int parameterNameLength = parameter.prefix().length() + parameter.name().length();
+          if (parameterNameLength > longestName) {
+            longestName = parameterNameLength;
+          }
+          
+          numChars += parameterNameLength;
+          numChars += 5; // # $0\n$1 = $2\n\n
+          numChars += parameter.description().length();
+          if (parameter.getStringValue() != null) {
+            numChars += parameter.getStringValue().length();
+          }
+          
+          recurseCount(parameter);
+        }
+      }
+      
+      private void recurseWrite(Parametered parametered) {
+        for (Parameter<?> parameter : parametered.getParameters()) {
+          sb.append("# ");
+          sb.append(parameter.description());
+          sb.append('\n');
+          sb.append(parameter.prefix());
+          sb.append(parameter.name());
+          sb.append(" = ");
+          if (parameter.getStringValue() != null) {
+            sb.append(parameter.getStringValue());
+          }
+          sb.append('\n');
+          sb.append('\n');
+          recurseWrite(parameter);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
new file mode 100644
index 0000000..a617fe3
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class PathParameter extends AbstractParameter<Path> {
+  
+  public PathParameter(String prefix, String name, Configuration jobConf, Path defaultValue, String description) {
+    super(Path.class, prefix, name, jobConf, defaultValue, description);
+  }
+  
+  @Override
+  public void setStringValue(String stringValue) {
+    set(new Path(stringValue));
+  }
+}