You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:26 UTC
[24/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
new file mode 100644
index 0000000..19f78b5
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathFilters.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * Supplies some useful and repeatedly-used instances of {@link PathFilter}.
+ */
+public final class PathFilters {
+
+ private static final PathFilter PART_FILE_INSTANCE = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return name.startsWith("part-") && !name.endsWith(".crc");
+ }
+ };
+
+ /**
+ * Pathfilter to read the final clustering file.
+ */
+ private static final PathFilter CLUSTER_FINAL = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return name.startsWith("clusters-") && name.endsWith("-final");
+ }
+ };
+
+ private static final PathFilter LOGS_CRC_INSTANCE = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return !(name.endsWith(".crc") || name.startsWith(".") || name.startsWith("_"));
+ }
+ };
+
+ private PathFilters() {
+ }
+
+ /**
+ * @return {@link PathFilter} that accepts paths whose file name starts with "part-". Excludes
+ * ".crc" files.
+ */
+ public static PathFilter partFilter() {
+ return PART_FILE_INSTANCE;
+ }
+
+ /**
+ * @return {@link PathFilter} that accepts paths whose file name starts with "part-" and ends with "-final".
+ */
+ public static PathFilter finalPartFilter() {
+ return CLUSTER_FINAL;
+ }
+
+ /**
+ * @return {@link PathFilter} that rejects paths whose file name starts with "_" (e.g. Cloudera
+ * _SUCCESS files or Hadoop _logs), or "." (e.g. local hidden files), or ends with ".crc"
+ */
+ public static PathFilter logsCRCFilter() {
+ return LOGS_CRC_INSTANCE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
new file mode 100644
index 0000000..7ea713e
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/PathType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+/**
+ * Used by {@link SequenceFileDirIterable} and the like to select whether the input path specifies a
+ * directory to list, or a glob pattern.
+ */
+public enum PathType {
+ GLOB,
+ LIST,
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
new file mode 100644
index 0000000..ca4d6b8
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterable.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirIterator}.</p>
+ */
+public final class SequenceFileDirIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+ private final Path path;
+ private final PathType pathType;
+ private final PathFilter filter;
+ private final Comparator<FileStatus> ordering;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ public SequenceFileDirIterable(Path path, PathType pathType, Configuration conf) {
+ this(path, pathType, null, conf);
+ }
+
+ public SequenceFileDirIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+ this(path, pathType, filter, null, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+ * glob pattern ({@link PathType#GLOB})
+ * @param filter if not null, specifies sequence files to be ignored by the iteration
+ * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+ * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileDirIterable(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) {
+ this.path = path;
+ this.pathType = pathType;
+ this.filter = filter;
+ this.ordering = ordering;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<Pair<K,V>> iterator() {
+ try {
+ return new SequenceFileDirIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
new file mode 100644
index 0000000..cf6a871
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+
+/**
+ * Like {@link SequenceFileIterator}, but iterates not just over one sequence file, but many. The input path
+ * may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirIterator<K extends Writable,V extends Writable>
+ extends ForwardingIterator<Pair<K,V>> implements Closeable {
+
+ private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+ private Iterator<Pair<K,V>> delegate;
+ private final List<SequenceFileIterator<K,V>> iterators;
+
+ /**
+ * Multifile sequence file iterator where files are specified explicitly by
+ * path parameters.
+ */
+ public SequenceFileDirIterator(Path[] path,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+
+ iterators = Lists.newArrayList();
+ // we assume all files should exist, otherwise we will bail out.
+ FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+ FileStatus[] statuses = new FileStatus[path.length];
+ for (int i = 0; i < statuses.length; i++) {
+ statuses[i] = fs.getFileStatus(path[i]);
+ }
+ init(statuses, reuseKeyValueInstances, conf);
+ }
+
+ /**
+ * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+ * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+ * (depending on pathType parameter).
+ */
+ public SequenceFileDirIterator(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+
+ FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
+ iterators = Lists.newArrayList();
+ init(statuses, reuseKeyValueInstances, conf);
+ }
+
+ private void init(FileStatus[] statuses,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) {
+
+ /*
+ * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+ * was qualified. In this case, which is a corner case, we should assume an
+ * empty iterator, not an NPE.
+ */
+ if (statuses == null) {
+ statuses = NO_STATUSES;
+ }
+
+ Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+ Iterator<Iterator<Pair<K, V>>> fsIterators =
+ Iterators.transform(fileStatusIterator,
+ new Function<FileStatus, Iterator<Pair<K, V>>>() {
+ @Override
+ public Iterator<Pair<K, V>> apply(FileStatus from) {
+ try {
+ SequenceFileIterator<K, V> iterator = new SequenceFileIterator<>(from.getPath(),
+ reuseKeyValueInstances, conf);
+ iterators.add(iterator);
+ return iterator;
+ } catch (IOException ioe) {
+ throw new IllegalStateException(from.getPath().toString(), ioe);
+ }
+ }
+ });
+
+ Collections.reverse(iterators); // close later in reverse order
+
+ delegate = Iterators.concat(fsIterators);
+ }
+
+ @Override
+ protected Iterator<Pair<K,V>> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(iterators);
+ iterators.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
new file mode 100644
index 0000000..1cb4ebc
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterable.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileDirValueIterator}.</p>
+ */
+public final class SequenceFileDirValueIterable<V extends Writable> implements Iterable<V> {
+
+ private final Path path;
+ private final PathType pathType;
+ private final PathFilter filter;
+ private final Comparator<FileStatus> ordering;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ public SequenceFileDirValueIterable(Path path, PathType pathType, Configuration conf) {
+ this(path, pathType, null, conf);
+ }
+
+ public SequenceFileDirValueIterable(Path path, PathType pathType, PathFilter filter, Configuration conf) {
+ this(path, pathType, filter, null, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param pathType whether or not to treat path as a directory ({@link PathType#LIST}) or
+ * glob pattern ({@link PathType#GLOB})
+ * @param filter if not null, specifies sequence files to be ignored by the iteration
+ * @param ordering if not null, specifies the order in which to iterate over matching sequence files
+ * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileDirValueIterable(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) {
+ this.path = path;
+ this.pathType = pathType;
+ this.filter = filter;
+ this.ordering = ordering;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<V> iterator() {
+ try {
+ return new SequenceFileDirValueIterator<>(path, pathType, filter, ordering, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
new file mode 100644
index 0000000..908c8bb
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ForwardingIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+
+/**
+ * Like {@link SequenceFileValueIterator}, but iterates not just over one
+ * sequence file, but many. The input path may be specified as a directory of
+ * files to read, or as a glob pattern. The set of files may be optionally
+ * restricted with a {@link PathFilter}.
+ */
+public final class SequenceFileDirValueIterator<V extends Writable> extends
+ ForwardingIterator<V> implements Closeable {
+
+ private static final FileStatus[] NO_STATUSES = new FileStatus[0];
+
+ private Iterator<V> delegate;
+ private final List<SequenceFileValueIterator<V>> iterators;
+
+ /**
+ * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+ * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate over
+ * (depending on pathType parameter).
+ */
+ public SequenceFileDirValueIterator(Path path,
+ PathType pathType,
+ PathFilter filter,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+ FileStatus[] statuses;
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ if (filter == null) {
+ statuses = pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
+ } else {
+ statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : fs.listStatus(path, filter);
+ }
+ iterators = Lists.newArrayList();
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
+
+ /**
+ * Multifile sequence file iterator where files are specified explicitly by
+ * path parameters.
+ */
+ public SequenceFileDirValueIterator(Path[] path,
+ Comparator<FileStatus> ordering,
+ boolean reuseKeyValueInstances,
+ Configuration conf) throws IOException {
+
+ iterators = Lists.newArrayList();
+ /*
+ * we assume all files should exist, otherwise we will bail out.
+ */
+ FileSystem fs = FileSystem.get(path[0].toUri(), conf);
+ FileStatus[] statuses = new FileStatus[path.length];
+ for (int i = 0; i < statuses.length; i++) {
+ statuses[i] = fs.getFileStatus(path[i]);
+ }
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
+
+ private void init(FileStatus[] statuses,
+ Comparator<FileStatus> ordering,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) throws IOException {
+
+ /*
+ * prevent NPEs. Unfortunately, Hadoop would return null for list if nothing
+ * was qualified. In this case, which is a corner case, we should assume an
+ * empty iterator, not an NPE.
+ */
+ if (statuses == null) {
+ statuses = NO_STATUSES;
+ }
+
+ if (ordering != null) {
+ Arrays.sort(statuses, ordering);
+ }
+ Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+ try {
+
+ Iterator<Iterator<V>> fsIterators =
+ Iterators.transform(fileStatusIterator,
+ new Function<FileStatus, Iterator<V>>() {
+ @Override
+ public Iterator<V> apply(FileStatus from) {
+ try {
+ SequenceFileValueIterator<V> iterator = new SequenceFileValueIterator<>(from.getPath(),
+ reuseKeyValueInstances, conf);
+ iterators.add(iterator);
+ return iterator;
+ } catch (IOException ioe) {
+ throw new IllegalStateException(from.getPath().toString(), ioe);
+ }
+ }
+ });
+
+ Collections.reverse(iterators); // close later in reverse order
+
+ delegate = Iterators.concat(fsIterators);
+
+ } finally {
+ /*
+ * prevent file handle leaks in case one of handles fails to open. If some
+ * of the files fail to open, constructor will fail and close() will never
+ * be called. Thus, those handles that did open in constructor, would leak
+ * out, unless we specifically handle it here.
+ */
+ IOUtils.close(iterators);
+ }
+ }
+
+ @Override
+ protected Iterator<V> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(iterators);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
new file mode 100644
index 0000000..f17c2a1
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterable.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.Pair;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileIterator}.</p>
+ */
+public final class SequenceFileIterable<K extends Writable,V extends Writable> implements Iterable<Pair<K,V>> {
+
+ private final Path path;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ /**
+ * Like {@link #SequenceFileIterable(Path, boolean, Configuration)} but key and value instances are not reused
+ * by default.
+ *
+ * @param path file to iterate over
+ */
+ public SequenceFileIterable(Path path, Configuration conf) {
+ this(path, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param reuseKeyValueInstances if true, reuses instances of the key and value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) {
+ this.path = path;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<Pair<K, V>> iterator() {
+ try {
+ return new SequenceFileIterator<>(path, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
new file mode 100644
index 0000000..bc5c549
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileIterator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.mahout.common.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s keys and values, as a {@link Pair}
+ * containing key and value.</p>
+ */
+public final class SequenceFileIterator<K extends Writable,V extends Writable>
+ extends AbstractIterator<Pair<K,V>> implements Closeable {
+
+ private final SequenceFile.Reader reader;
+ private final Configuration conf;
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private final boolean noValue;
+ private K key;
+ private V value;
+ private final boolean reuseKeyValueInstances;
+
+ private static final Logger log = LoggerFactory.getLogger(SequenceFileIterator.class);
+
+ /**
+ * @throws IOException if path can't be read, or its key or value class can't be instantiated
+ */
+
+ public SequenceFileIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
+ key = null;
+ value = null;
+ FileSystem fs = path.getFileSystem(conf);
+ path = path.makeQualified(fs);
+ reader = new SequenceFile.Reader(fs, path, conf);
+ this.conf = conf;
+ keyClass = (Class<K>) reader.getKeyClass();
+ valueClass = (Class<V>) reader.getValueClass();
+ noValue = NullWritable.class.equals(valueClass);
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ }
+
+ public Class<K> getKeyClass() {
+ return keyClass;
+ }
+
+ public Class<V> getValueClass() {
+ return valueClass;
+ }
+
+ @Override
+ public void close() throws IOException {
+ key = null;
+ value = null;
+ Closeables.close(reader, true);
+
+ endOfData();
+ }
+
+ @Override
+ protected Pair<K,V> computeNext() {
+ if (!reuseKeyValueInstances || value == null) {
+ key = ReflectionUtils.newInstance(keyClass, conf);
+ if (!noValue) {
+ value = ReflectionUtils.newInstance(valueClass, conf);
+ }
+ }
+ try {
+ boolean available;
+ if (noValue) {
+ available = reader.next(key);
+ } else {
+ available = reader.next(key, value);
+ }
+ if (!available) {
+ close();
+ return null;
+ }
+ return new Pair<>(key, value);
+ } catch (IOException ioe) {
+ try {
+ close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
new file mode 100644
index 0000000..d2fdf8d
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileValueIterator}.</p>
+ */
+public final class SequenceFileValueIterable<V extends Writable> implements Iterable<V> {
+
+ private final Path path;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ /**
+ * Like {@link #SequenceFileValueIterable(Path, boolean, Configuration)} but instances are not reused
+ * by default.
+ *
+ * @param path file to iterate over
+ */
+ public SequenceFileValueIterable(Path path, Configuration conf) {
+ this(path, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileValueIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) {
+ this.path = path;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<V> iterator() {
+ try {
+ return new SequenceFileValueIterator<>(path, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
new file mode 100644
index 0000000..cb2295c
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s values only.</p>
+ */
+public final class SequenceFileValueIterator<V extends Writable> extends AbstractIterator<V> implements Closeable {
+
+ private final SequenceFile.Reader reader;
+ private final Configuration conf;
+ private final Class<V> valueClass;
+ private final Writable key;
+ private V value;
+ private final boolean reuseKeyValueInstances;
+
+ private static final Logger log = LoggerFactory.getLogger(SequenceFileValueIterator.class);
+
+ /**
+ * @throws IOException if path can't be read, or its key or value class can't be instantiated
+ */
+
+ public SequenceFileValueIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
+ value = null;
+ FileSystem fs = path.getFileSystem(conf);
+ path = fs.makeQualified(path);
+ reader = new SequenceFile.Reader(fs, path, conf);
+ this.conf = conf;
+ Class<? extends Writable> keyClass = (Class<? extends Writable>) reader.getKeyClass();
+ key = ReflectionUtils.newInstance(keyClass, conf);
+ valueClass = (Class<V>) reader.getValueClass();
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ }
+
+ public Class<V> getValueClass() {
+ return valueClass;
+ }
+
+ @Override
+ public void close() throws IOException {
+ value = null;
+ Closeables.close(reader, true);
+ endOfData();
+ }
+
+ @Override
+ protected V computeNext() {
+ if (!reuseKeyValueInstances || value == null) {
+ value = ReflectionUtils.newInstance(valueClass, conf);
+ }
+ try {
+ boolean available = reader.next(key, value);
+ if (!available) {
+ close();
+ return null;
+ }
+ return value;
+ } catch (IOException ioe) {
+ try {
+ close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
new file mode 100644
index 0000000..742d6cf
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
@@ -0,0 +1,61 @@
+package org.apache.mahout.common.lucene;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.ClassUtils;
+
+public final class AnalyzerUtils {
+
+ private AnalyzerUtils() {}
+
+ /**
+ * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}. Note, if you need to pass in
+ * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments
+ * @param analyzerClassName - Lucene Analyzer Name
+ * @return {@link Analyzer}
+ * @throws ClassNotFoundException - {@link ClassNotFoundException}
+ */
+ public static Analyzer createAnalyzer(String analyzerClassName) throws ClassNotFoundException {
+ return createAnalyzer(analyzerClassName, Version.LUCENE_5_5_2);
+ }
+
+ public static Analyzer createAnalyzer(String analyzerClassName, Version version) throws ClassNotFoundException {
+ Class<? extends Analyzer> analyzerClass = Class.forName(analyzerClassName).asSubclass(Analyzer.class);
+ return createAnalyzer(analyzerClass, version);
+ }
+
+ /**
+ * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}. Note, if you need to pass in
+ * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments
+ * @param analyzerClass The Analyzer Class to instantiate
+ * @return {@link Analyzer}
+ */
+ public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass) {
+ return createAnalyzer(analyzerClass, Version.LUCENE_5_5_2);
+ }
+
+ public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass, Version version) {
+ try {
+ return ClassUtils.instantiateAs(analyzerClass, Analyzer.class,
+ new Class<?>[] { Version.class }, new Object[] { version });
+ } catch (IllegalStateException e) {
+ return ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
new file mode 100644
index 0000000..5facad8
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.lucene;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+
+import java.util.Iterator;
+
+/** Used to emit tokens from an input string array in the style of TokenStream */
+public final class IteratorTokenStream extends TokenStream {
+ private final CharTermAttribute termAtt;
+ private final Iterator<String> iterator;
+
+ public IteratorTokenStream(Iterator<String> iterator) {
+ this.iterator = iterator;
+ this.termAtt = addAttribute(CharTermAttribute.class);
+ }
+
+ @Override
+ public boolean incrementToken() {
+ if (iterator.hasNext()) {
+ clearAttributes();
+ termAtt.append(iterator.next());
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
new file mode 100644
index 0000000..af60d8b
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.lucene;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+
+import java.io.IOException;
+
+/**
+ * Provide an Iterator for the tokens in a TokenStream.
+ *
+ * Note, it is the responsibility of the instantiating class to properly consume the
+ * {@link org.apache.lucene.analysis.TokenStream}. See the Lucene {@link org.apache.lucene.analysis.TokenStream}
+ * documentation for more information.
+ */
+//TODO: consider using the char/byte arrays instead of strings, esp. when we upgrade to Lucene 4.0
+public final class TokenStreamIterator extends AbstractIterator<String> {
+
+ private final TokenStream tokenStream;
+
+ public TokenStreamIterator(TokenStream tokenStream) {
+ this.tokenStream = tokenStream;
+ }
+
+ @Override
+ protected String computeNext() {
+ try {
+ if (tokenStream.incrementToken()) {
+ return tokenStream.getAttribute(CharTermAttribute.class).toString();
+ } else {
+ tokenStream.end();
+ tokenStream.close();
+ return endOfData();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("IO error while tokenizing", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
new file mode 100644
index 0000000..8e0385d
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class MergeVectorsCombiner
+ extends Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+
+ @Override
+ public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(key, VectorWritable.merge(vectors.iterator()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
new file mode 100644
index 0000000..b8d5dea
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class MergeVectorsReducer extends
+ Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+
+ private final VectorWritable result = new VectorWritable();
+
+ @Override
+ public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx)
+ throws IOException, InterruptedException {
+ Vector merged = VectorWritable.merge(vectors.iterator()).get();
+ result.set(new SequentialAccessSparseVector(merged));
+ ctx.write(key, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
new file mode 100644
index 0000000..c6c3f05
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class TransposeMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ public static final String NEW_NUM_COLS_PARAM = TransposeMapper.class.getName() + ".newNumCols";
+
+ private int newNumCols;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ newNumCols = ctx.getConfiguration().getInt(NEW_NUM_COLS_PARAM, Integer.MAX_VALUE);
+ }
+
+ @Override
+ protected void map(IntWritable r, VectorWritable v, Context ctx) throws IOException, InterruptedException {
+ int row = r.get();
+ for (Vector.Element e : v.get().nonZeroes()) {
+ RandomAccessSparseVector tmp = new RandomAccessSparseVector(newNumCols, 1);
+ tmp.setQuick(row, e.get());
+ r.set(e.index());
+ ctx.write(r, new VectorWritable(tmp));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
new file mode 100644
index 0000000..1d93386
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
+
+import java.io.IOException;
+
+public class VectorSumCombiner
+ extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
+ private final VectorWritable result = new VectorWritable();
+
+ @Override
+ protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
+ throws IOException, InterruptedException {
+ result.set(Vectors.sum(values.iterator()));
+ ctx.write(key, result);
+ }
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
new file mode 100644
index 0000000..97d3805
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
+
+import java.io.IOException;
+
+public class VectorSumReducer
+ extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
+ @Override
+ protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(key, new VectorWritable(Vectors.sum(values.iterator())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
new file mode 100644
index 0000000..7adadc1
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.nlp;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class NGrams {
+
+ private static final Splitter SPACE_TAB = Splitter.on(CharMatcher.anyOf(" \t"));
+
+ private final String line;
+ private final int gramSize;
+
+ public NGrams(String line, int gramSize) {
+ this.line = line;
+ this.gramSize = gramSize;
+ }
+
+ public Map<String,List<String>> generateNGrams() {
+ Map<String,List<String>> returnDocument = Maps.newHashMap();
+
+ Iterator<String> tokenizer = SPACE_TAB.split(line).iterator();
+ List<String> tokens = Lists.newArrayList();
+ String labelName = tokenizer.next();
+ List<String> previousN1Grams = Lists.newArrayList();
+ while (tokenizer.hasNext()) {
+
+ String nextToken = tokenizer.next();
+ if (previousN1Grams.size() == gramSize) {
+ previousN1Grams.remove(0);
+ }
+
+ previousN1Grams.add(nextToken);
+
+ StringBuilder gramBuilder = new StringBuilder();
+
+ for (String gram : previousN1Grams) {
+ gramBuilder.append(gram);
+ String token = gramBuilder.toString();
+ tokens.add(token);
+ gramBuilder.append(' ');
+ }
+ }
+ returnDocument.put(labelName, tokens);
+ return returnDocument;
+ }
+
+ public List<String> generateNGramsWithoutLabel() {
+
+ List<String> tokens = Lists.newArrayList();
+ List<String> previousN1Grams = Lists.newArrayList();
+ for (String nextToken : SPACE_TAB.split(line)) {
+
+ if (previousN1Grams.size() == gramSize) {
+ previousN1Grams.remove(0);
+ }
+
+ previousN1Grams.add(nextToken);
+
+ StringBuilder gramBuilder = new StringBuilder();
+
+ for (String gram : previousN1Grams) {
+ gramBuilder.append(gram);
+ String token = gramBuilder.toString();
+ tokens.add(token);
+ gramBuilder.append(' ');
+ }
+ }
+
+ return tokens;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
new file mode 100644
index 0000000..f0a7aa8
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+
+public abstract class AbstractParameter<T> implements Parameter<T> {
+
+ private T value;
+ private final String prefix;
+ private final String name;
+ private final String description;
+ private final Class<T> type;
+ private final String defaultValue;
+
+ protected AbstractParameter(Class<T> type,
+ String prefix,
+ String name,
+ Configuration jobConf,
+ T defaultValue,
+ String description) {
+ this.type = type;
+ this.name = name;
+ this.description = description;
+
+ this.value = defaultValue;
+ this.defaultValue = getStringValue();
+
+ this.prefix = prefix;
+ String jobConfValue = jobConf.get(prefix + name);
+ if (jobConfValue != null) {
+ setStringValue(jobConfValue);
+ }
+
+ }
+
+ @Override
+ public void configure(Configuration jobConf) {
+ // nothing to do
+ }
+
+ @Override
+ public void createParameters(String prefix, Configuration jobConf) { }
+
+ @Override
+ public String getStringValue() {
+ if (value == null) {
+ return null;
+ }
+ return value.toString();
+ }
+
+ @Override
+ public Collection<Parameter<?>> getParameters() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String prefix() {
+ return prefix;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String description() {
+ return description;
+ }
+
+ @Override
+ public Class<T> type() {
+ return type;
+ }
+
+ @Override
+ public String defaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public T get() {
+ return value;
+ }
+
+ @Override
+ public void set(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ if (value != null) {
+ return value.toString();
+ } else {
+ return super.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
new file mode 100644
index 0000000..1d1c0bb
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class ClassParameter extends AbstractParameter<Class> {
+
+ public ClassParameter(String prefix, String name, Configuration jobConf, Class<?> defaultValue, String description) {
+ super(Class.class, prefix, name, jobConf, defaultValue, description);
+ }
+
+ @Override
+ public void setStringValue(String stringValue) {
+ try {
+ set(Class.forName(stringValue));
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public String getStringValue() {
+ if (get() == null) {
+ return null;
+ }
+ return get().getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
new file mode 100644
index 0000000..cb3efcf
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class DoubleParameter extends AbstractParameter<Double> {
+
+ public DoubleParameter(String prefix, String name, Configuration conf, double defaultValue, String description) {
+ super(Double.class, prefix, name, conf, defaultValue, description);
+ }
+
+ @Override
+ public void setStringValue(String stringValue) {
+ set(Double.valueOf(stringValue));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
new file mode 100644
index 0000000..292fa27
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+/**
+ * An accessor to a parameters in the job.
+ *
+ * This is a composite entity that can it self contain more parameters. Say the parameters describes what
+ * DistanceMeasure class to use, once set this parameters would also produce the parameters available in that
+ * DistanceMeasure implementation.
+ */
+public interface Parameter<T> extends Parametered {
+ /** @return job configuration setting key prefix, e.g. 'org.apache.mahout.util.WeightedDistanceMeasure.' */
+ String prefix();
+
+ /** @return configuration parameters name, e.g. 'weightsFile' */
+ String name();
+
+ /** @return human readable description of parameters */
+ String description();
+
+ /** @return value class type */
+ Class<T> type();
+
+ /**
+ * @param stringValue
+ * value string representation
+ */
+ void setStringValue(String stringValue);
+
+ /**
+ * @return value string representation of current value
+ */
+ String getStringValue();
+
+ /**
+ * @param value
+ * new parameters value
+ */
+ void set(T value);
+
+ /** @return current parameters value */
+ T get();
+
+ /** @return value used if not set by consumer */
+ String defaultValue();
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
new file mode 100644
index 0000000..96c9457
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Meta information and accessors for configuring a job. */
+public interface Parametered {
+
+ Logger log = LoggerFactory.getLogger(Parametered.class);
+
+ Collection<Parameter<?>> getParameters();
+
+ /**
+ * EXPERT: consumers should never have to call this method. It would be friendly visible to
+ * {@link ParameteredGeneralizations} if java supported it. Calling this method should create a new list of
+ * parameters and is called
+ *
+ * @param prefix
+ * ends with a dot if not empty.
+ * @param jobConf
+ * configuration used for retrieving values
+ * @see ParameteredGeneralizations#configureParameters(String,Parametered,Configuration)
+ * invoking method
+ * @see ParameteredGeneralizations#configureParametersRecursively(Parametered,String,Configuration)
+ * invoking method
+ */
+ void createParameters(String prefix, Configuration jobConf);
+
+ void configure(Configuration config);
+
+ /** "multiple inheritance" */
+ final class ParameteredGeneralizations {
+ private ParameteredGeneralizations() { }
+
+ public static void configureParameters(Parametered parametered, Configuration jobConf) {
+ configureParameters(parametered.getClass().getSimpleName() + '.',
+ parametered, jobConf);
+
+ }
+
+ /**
+ * Calls
+ * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)}
+ * on parameter parmetered, and then recur down its composite tree to invoke
+ * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)}
+ * and {@link Parametered#configure(org.apache.hadoop.conf.Configuration)} on
+ * each composite part.
+ *
+ * @param prefix
+ * ends with a dot if not empty.
+ * @param parametered
+ * instance to be configured
+ * @param jobConf
+ * configuration used for retrieving values
+ */
+ public static void configureParameters(String prefix, Parametered parametered, Configuration jobConf) {
+ parametered.createParameters(prefix, jobConf);
+ configureParametersRecursively(parametered, prefix, jobConf);
+ }
+
+ private static void configureParametersRecursively(Parametered parametered, String prefix, Configuration jobConf) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Configuring {}{}", prefix, parameter.name());
+ }
+ String name = prefix + parameter.name() + '.';
+ parameter.createParameters(name, jobConf);
+ parameter.configure(jobConf);
+ if (!parameter.getParameters().isEmpty()) {
+ configureParametersRecursively(parameter, name, jobConf);
+ }
+ }
+ }
+
+ public static String help(Parametered parametered) {
+ return new Help(parametered).toString();
+ }
+
+ public static String conf(Parametered parametered) {
+ return new Conf(parametered).toString();
+ }
+
+ private static final class Help {
+ static final int NAME_DESC_DISTANCE = 8;
+
+ private final StringBuilder sb;
+ private int longestName;
+ private int numChars = 100; // a few extra just to be sure
+
+ private Help(Parametered parametered) {
+ recurseCount(parametered);
+ numChars += (longestName + NAME_DESC_DISTANCE) * parametered.getParameters().size();
+ sb = new StringBuilder(numChars);
+ recurseWrite(parametered);
+ }
+
+ @Override
+ public String toString() {
+ return sb.toString();
+ }
+
+ private void recurseCount(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ int parameterNameLength = parameter.name().length();
+ if (parameterNameLength > longestName) {
+ longestName = parameterNameLength;
+ }
+ recurseCount(parameter);
+ numChars += parameter.description().length();
+ }
+ }
+
+ private void recurseWrite(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ sb.append(parameter.prefix());
+ sb.append(parameter.name());
+ int max = longestName - parameter.name().length() - parameter.prefix().length()
+ + NAME_DESC_DISTANCE;
+ for (int i = 0; i < max; i++) {
+ sb.append(' ');
+ }
+ sb.append(parameter.description());
+ if (parameter.defaultValue() != null) {
+ sb.append(" (default value '");
+ sb.append(parameter.defaultValue());
+ sb.append("')");
+ }
+ sb.append('\n');
+ recurseWrite(parameter);
+ }
+ }
+ }
+
+ private static final class Conf {
+ private final StringBuilder sb;
+ private int longestName;
+ private int numChars = 100; // a few extra just to be sure
+
+ private Conf(Parametered parametered) {
+ recurseCount(parametered);
+ sb = new StringBuilder(numChars);
+ recurseWrite(parametered);
+ }
+
+ @Override
+ public String toString() {
+ return sb.toString();
+ }
+
+ private void recurseCount(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ int parameterNameLength = parameter.prefix().length() + parameter.name().length();
+ if (parameterNameLength > longestName) {
+ longestName = parameterNameLength;
+ }
+
+ numChars += parameterNameLength;
+ numChars += 5; // # $0\n$1 = $2\n\n
+ numChars += parameter.description().length();
+ if (parameter.getStringValue() != null) {
+ numChars += parameter.getStringValue().length();
+ }
+
+ recurseCount(parameter);
+ }
+ }
+
+ private void recurseWrite(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ sb.append("# ");
+ sb.append(parameter.description());
+ sb.append('\n');
+ sb.append(parameter.prefix());
+ sb.append(parameter.name());
+ sb.append(" = ");
+ if (parameter.getStringValue() != null) {
+ sb.append(parameter.getStringValue());
+ }
+ sb.append('\n');
+ sb.append('\n');
+ recurseWrite(parameter);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
new file mode 100644
index 0000000..a617fe3
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class PathParameter extends AbstractParameter<Path> {
+
+ public PathParameter(String prefix, String name, Configuration jobConf, Path defaultValue, String description) {
+ super(Path.class, prefix, name, jobConf, defaultValue, description);
+ }
+
+ @Override
+ public void setStringValue(String stringValue) {
+ set(new Path(stringValue));
+ }
+}