You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by pa...@apache.org on 2015/04/01 20:07:53 UTC
[22/51] [partial] mahout git commit: MAHOUT-1655 Refactors mr-legacy
into mahout-hdfs and mahout-mr, closes apache/mahout#86
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
new file mode 100644
index 0000000..d2fdf8d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>{@link Iterable} counterpart to {@link SequenceFileValueIterator}.</p>
+ */
+public final class SequenceFileValueIterable<V extends Writable> implements Iterable<V> {
+
+ private final Path path;
+ private final boolean reuseKeyValueInstances;
+ private final Configuration conf;
+
+ /**
+ * Like {@link #SequenceFileValueIterable(Path, boolean, Configuration)} but instances are not reused
+ * by default.
+ *
+ * @param path file to iterate over
+ */
+ public SequenceFileValueIterable(Path path, Configuration conf) {
+ this(path, false, conf);
+ }
+
+ /**
+ * @param path file to iterate over
+ * @param reuseKeyValueInstances if true, reuses instances of the value object instead of creating a new
+ * one for each read from the file
+ */
+ public SequenceFileValueIterable(Path path, boolean reuseKeyValueInstances, Configuration conf) {
+ this.path = path;
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ this.conf = conf;
+ }
+
+ @Override
+ public Iterator<V> iterator() {
+ try {
+ return new SequenceFileValueIterator<>(path, reuseKeyValueInstances, conf);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(path.toString(), ioe);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
new file mode 100644
index 0000000..49d64c7
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileValueIterator.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.iterator.sequencefile;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>{@link java.util.Iterator} over a {@link SequenceFile}'s values only.</p>
+ */
+public final class SequenceFileValueIterator<V extends Writable> extends AbstractIterator<V> implements Closeable {
+
+ private final SequenceFile.Reader reader;
+ private final Configuration conf;
+ private final Class<V> valueClass;
+ private final Writable key;
+ private V value;
+ private final boolean reuseKeyValueInstances;
+
+ private static final Logger log = LoggerFactory.getLogger(SequenceFileValueIterator.class);
+
+ /**
+ * @throws IOException if path can't be read, or its key or value class can't be instantiated
+ */
+
+ public SequenceFileValueIterator(Path path, boolean reuseKeyValueInstances, Configuration conf) throws IOException {
+ value = null;
+ FileSystem fs = path.getFileSystem(conf);
+ path = path.makeQualified(path.toUri(), path);
+ reader = new SequenceFile.Reader(fs, path, conf);
+ this.conf = conf;
+ Class<? extends Writable> keyClass = (Class<? extends Writable>) reader.getKeyClass();
+ key = ReflectionUtils.newInstance(keyClass, conf);
+ valueClass = (Class<V>) reader.getValueClass();
+ this.reuseKeyValueInstances = reuseKeyValueInstances;
+ }
+
+ public Class<V> getValueClass() {
+ return valueClass;
+ }
+
+ @Override
+ public void close() throws IOException {
+ value = null;
+ Closeables.close(reader, true);
+ endOfData();
+ }
+
+ @Override
+ protected V computeNext() {
+ if (!reuseKeyValueInstances || value == null) {
+ value = ReflectionUtils.newInstance(valueClass, conf);
+ }
+ try {
+ boolean available = reader.next(key, value);
+ if (!available) {
+ close();
+ return null;
+ }
+ return value;
+ } catch (IOException ioe) {
+ try {
+ close();
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ throw new IllegalStateException(ioe);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java b/mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
new file mode 100644
index 0000000..37ca383
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/lucene/AnalyzerUtils.java
@@ -0,0 +1,61 @@
+package org.apache.mahout.common.lucene;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.ClassUtils;
+
+public final class AnalyzerUtils {
+
+ private AnalyzerUtils() {}
+
+ /**
+ * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}. Note, if you need to pass in
+ * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments
+ * @param analyzerClassName - Lucene Analyzer Name
+ * @return {@link Analyzer}
+ * @throws ClassNotFoundException - {@link ClassNotFoundException}
+ */
+ public static Analyzer createAnalyzer(String analyzerClassName) throws ClassNotFoundException {
+ return createAnalyzer(analyzerClassName, Version.LUCENE_46);
+ }
+
+ public static Analyzer createAnalyzer(String analyzerClassName, Version version) throws ClassNotFoundException {
+ Class<? extends Analyzer> analyzerClass = Class.forName(analyzerClassName).asSubclass(Analyzer.class);
+ return createAnalyzer(analyzerClass, version);
+ }
+
+ /**
+ * Create an Analyzer using the latest {@link org.apache.lucene.util.Version}. Note, if you need to pass in
+ * parameters to your constructor, you will need to wrap it in an implementation that does not take any arguments
+ * @param analyzerClass The Analyzer Class to instantiate
+ * @return {@link Analyzer}
+ */
+ public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass) {
+ return createAnalyzer(analyzerClass, Version.LUCENE_46);
+ }
+
+ public static Analyzer createAnalyzer(Class<? extends Analyzer> analyzerClass, Version version) {
+ try {
+ return ClassUtils.instantiateAs(analyzerClass, Analyzer.class,
+ new Class<?>[] { Version.class }, new Object[] { version });
+ } catch (IllegalStateException e) {
+ return ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java b/mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
new file mode 100644
index 0000000..5facad8
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/lucene/IteratorTokenStream.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.lucene;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+
+import java.util.Iterator;
+
+/** Used to emit tokens from an input string array in the style of TokenStream */
+public final class IteratorTokenStream extends TokenStream {
+ private final CharTermAttribute termAtt;
+ private final Iterator<String> iterator;
+
+ public IteratorTokenStream(Iterator<String> iterator) {
+ this.iterator = iterator;
+ this.termAtt = addAttribute(CharTermAttribute.class);
+ }
+
+ @Override
+ public boolean incrementToken() {
+ if (iterator.hasNext()) {
+ clearAttributes();
+ termAtt.append(iterator.next());
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java b/mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
new file mode 100644
index 0000000..af60d8b
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/lucene/TokenStreamIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.lucene;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+
+import java.io.IOException;
+
+/**
+ * Provide an Iterator for the tokens in a TokenStream.
+ *
+ * Note, it is the responsibility of the instantiating class to properly consume the
+ * {@link org.apache.lucene.analysis.TokenStream}. See the Lucene {@link org.apache.lucene.analysis.TokenStream}
+ * documentation for more information.
+ */
+//TODO: consider using the char/byte arrays instead of strings, esp. when we upgrade to Lucene 4.0
+public final class TokenStreamIterator extends AbstractIterator<String> {
+
+ private final TokenStream tokenStream;
+
+ public TokenStreamIterator(TokenStream tokenStream) {
+ this.tokenStream = tokenStream;
+ }
+
+ @Override
+ protected String computeNext() {
+ try {
+ if (tokenStream.incrementToken()) {
+ return tokenStream.getAttribute(CharTermAttribute.class).toString();
+ } else {
+ tokenStream.end();
+ tokenStream.close();
+ return endOfData();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("IO error while tokenizing", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java b/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
new file mode 100644
index 0000000..8e0385d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsCombiner.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class MergeVectorsCombiner
+ extends Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+
+ @Override
+ public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(key, VectorWritable.merge(vectors.iterator()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java b/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
new file mode 100644
index 0000000..b8d5dea
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/mapreduce/MergeVectorsReducer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class MergeVectorsReducer extends
+ Reducer<WritableComparable<?>,VectorWritable,WritableComparable<?>,VectorWritable> {
+
+ private final VectorWritable result = new VectorWritable();
+
+ @Override
+ public void reduce(WritableComparable<?> key, Iterable<VectorWritable> vectors, Context ctx)
+ throws IOException, InterruptedException {
+ Vector merged = VectorWritable.merge(vectors.iterator()).get();
+ result.set(new SequentialAccessSparseVector(merged));
+ ctx.write(key, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java b/mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
new file mode 100644
index 0000000..c6c3f05
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/mapreduce/TransposeMapper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class TransposeMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+ public static final String NEW_NUM_COLS_PARAM = TransposeMapper.class.getName() + ".newNumCols";
+
+ private int newNumCols;
+
+ @Override
+ protected void setup(Context ctx) throws IOException, InterruptedException {
+ newNumCols = ctx.getConfiguration().getInt(NEW_NUM_COLS_PARAM, Integer.MAX_VALUE);
+ }
+
+ @Override
+ protected void map(IntWritable r, VectorWritable v, Context ctx) throws IOException, InterruptedException {
+ int row = r.get();
+ for (Vector.Element e : v.get().nonZeroes()) {
+ RandomAccessSparseVector tmp = new RandomAccessSparseVector(newNumCols, 1);
+ tmp.setQuick(row, e.get());
+ r.set(e.index());
+ ctx.write(r, new VectorWritable(tmp));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java b/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
new file mode 100644
index 0000000..1d93386
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumCombiner.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
+
+import java.io.IOException;
+
+public class VectorSumCombiner
+ extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
+ private final VectorWritable result = new VectorWritable();
+
+ @Override
+ protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
+ throws IOException, InterruptedException {
+ result.set(Vectors.sum(values.iterator()));
+ ctx.write(key, result);
+ }
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java b/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
new file mode 100644
index 0000000..97d3805
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
+
+import java.io.IOException;
+
+public class VectorSumReducer
+ extends Reducer<WritableComparable<?>, VectorWritable, WritableComparable<?>, VectorWritable> {
+
+ @Override
+ protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> values, Context ctx)
+ throws IOException, InterruptedException {
+ ctx.write(key, new VectorWritable(Vectors.sum(values.iterator())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java b/mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
new file mode 100644
index 0000000..7adadc1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/nlp/NGrams.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.nlp;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class NGrams {
+
+ private static final Splitter SPACE_TAB = Splitter.on(CharMatcher.anyOf(" \t"));
+
+ private final String line;
+ private final int gramSize;
+
+ public NGrams(String line, int gramSize) {
+ this.line = line;
+ this.gramSize = gramSize;
+ }
+
+ public Map<String,List<String>> generateNGrams() {
+ Map<String,List<String>> returnDocument = Maps.newHashMap();
+
+ Iterator<String> tokenizer = SPACE_TAB.split(line).iterator();
+ List<String> tokens = Lists.newArrayList();
+ String labelName = tokenizer.next();
+ List<String> previousN1Grams = Lists.newArrayList();
+ while (tokenizer.hasNext()) {
+
+ String nextToken = tokenizer.next();
+ if (previousN1Grams.size() == gramSize) {
+ previousN1Grams.remove(0);
+ }
+
+ previousN1Grams.add(nextToken);
+
+ StringBuilder gramBuilder = new StringBuilder();
+
+ for (String gram : previousN1Grams) {
+ gramBuilder.append(gram);
+ String token = gramBuilder.toString();
+ tokens.add(token);
+ gramBuilder.append(' ');
+ }
+ }
+ returnDocument.put(labelName, tokens);
+ return returnDocument;
+ }
+
+ public List<String> generateNGramsWithoutLabel() {
+
+ List<String> tokens = Lists.newArrayList();
+ List<String> previousN1Grams = Lists.newArrayList();
+ for (String nextToken : SPACE_TAB.split(line)) {
+
+ if (previousN1Grams.size() == gramSize) {
+ previousN1Grams.remove(0);
+ }
+
+ previousN1Grams.add(nextToken);
+
+ StringBuilder gramBuilder = new StringBuilder();
+
+ for (String gram : previousN1Grams) {
+ gramBuilder.append(gram);
+ String token = gramBuilder.toString();
+ tokens.add(token);
+ gramBuilder.append(' ');
+ }
+ }
+
+ return tokens;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java b/mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
new file mode 100644
index 0000000..f0a7aa8
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/parameters/AbstractParameter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+
+public abstract class AbstractParameter<T> implements Parameter<T> {
+
+ private T value;
+ private final String prefix;
+ private final String name;
+ private final String description;
+ private final Class<T> type;
+ private final String defaultValue;
+
+ protected AbstractParameter(Class<T> type,
+ String prefix,
+ String name,
+ Configuration jobConf,
+ T defaultValue,
+ String description) {
+ this.type = type;
+ this.name = name;
+ this.description = description;
+
+ this.value = defaultValue;
+ this.defaultValue = getStringValue();
+
+ this.prefix = prefix;
+ String jobConfValue = jobConf.get(prefix + name);
+ if (jobConfValue != null) {
+ setStringValue(jobConfValue);
+ }
+
+ }
+
+ @Override
+ public void configure(Configuration jobConf) {
+ // nothing to do
+ }
+
+ @Override
+ public void createParameters(String prefix, Configuration jobConf) { }
+
+ @Override
+ public String getStringValue() {
+ if (value == null) {
+ return null;
+ }
+ return value.toString();
+ }
+
+ @Override
+ public Collection<Parameter<?>> getParameters() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String prefix() {
+ return prefix;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String description() {
+ return description;
+ }
+
+ @Override
+ public Class<T> type() {
+ return type;
+ }
+
+ @Override
+ public String defaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public T get() {
+ return value;
+ }
+
+ @Override
+ public void set(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ if (value != null) {
+ return value.toString();
+ } else {
+ return super.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java b/mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
new file mode 100644
index 0000000..1d1c0bb
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/parameters/ClassParameter.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class ClassParameter extends AbstractParameter<Class> {
+
+ public ClassParameter(String prefix, String name, Configuration jobConf, Class<?> defaultValue, String description) {
+ super(Class.class, prefix, name, jobConf, defaultValue, description);
+ }
+
+ @Override
+ public void setStringValue(String stringValue) {
+ try {
+ set(Class.forName(stringValue));
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public String getStringValue() {
+ if (get() == null) {
+ return null;
+ }
+ return get().getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java b/mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
new file mode 100644
index 0000000..cb3efcf
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/parameters/DoubleParameter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class DoubleParameter extends AbstractParameter<Double> {
+
+ public DoubleParameter(String prefix, String name, Configuration conf, double defaultValue, String description) {
+ super(Double.class, prefix, name, conf, defaultValue, description);
+ }
+
+ @Override
+ public void setStringValue(String stringValue) {
+ set(Double.valueOf(stringValue));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java b/mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
new file mode 100644
index 0000000..292fa27
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/parameters/Parameter.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+/**
+ * An accessor to a parameters in the job.
+ *
+ * This is a composite entity that can it self contain more parameters. Say the parameters describes what
+ * DistanceMeasure class to use, once set this parameters would also produce the parameters available in that
+ * DistanceMeasure implementation.
+ */
+public interface Parameter<T> extends Parametered {
+ /** @return job configuration setting key prefix, e.g. 'org.apache.mahout.util.WeightedDistanceMeasure.' */
+ String prefix();
+
+ /** @return configuration parameters name, e.g. 'weightsFile' */
+ String name();
+
+ /** @return human readable description of parameters */
+ String description();
+
+ /** @return value class type */
+ Class<T> type();
+
+ /**
+ * @param stringValue
+ * value string representation
+ */
+ void setStringValue(String stringValue);
+
+ /**
+ * @return value string representation of current value
+ */
+ String getStringValue();
+
+ /**
+ * @param value
+ * new parameters value
+ */
+ void set(T value);
+
+ /** @return current parameters value */
+ T get();
+
+ /** @return value used if not set by consumer */
+ String defaultValue();
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java b/mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
new file mode 100644
index 0000000..96c9457
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/parameters/Parametered.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Meta information and accessors for configuring a job. */
+public interface Parametered {
+
+ Logger log = LoggerFactory.getLogger(Parametered.class);
+
+ Collection<Parameter<?>> getParameters();
+
+ /**
+ * EXPERT: consumers should never have to call this method. It would be friendly visible to
+ * {@link ParameteredGeneralizations} if java supported it. Calling this method should create a new list of
+ * parameters and is called
+ *
+ * @param prefix
+ * ends with a dot if not empty.
+ * @param jobConf
+ * configuration used for retrieving values
+ * @see ParameteredGeneralizations#configureParameters(String,Parametered,Configuration)
+ * invoking method
+ * @see ParameteredGeneralizations#configureParametersRecursively(Parametered,String,Configuration)
+ * invoking method
+ */
+ void createParameters(String prefix, Configuration jobConf);
+
+ void configure(Configuration config);
+
+ /** "multiple inheritance" */
+ final class ParameteredGeneralizations {
+ private ParameteredGeneralizations() { }
+
+ public static void configureParameters(Parametered parametered, Configuration jobConf) {
+ configureParameters(parametered.getClass().getSimpleName() + '.',
+ parametered, jobConf);
+
+ }
+
+ /**
+ * Calls
+ * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)}
+ * on parameter parmetered, and then recur down its composite tree to invoke
+ * {@link Parametered#createParameters(String,org.apache.hadoop.conf.Configuration)}
+ * and {@link Parametered#configure(org.apache.hadoop.conf.Configuration)} on
+ * each composite part.
+ *
+ * @param prefix
+ * ends with a dot if not empty.
+ * @param parametered
+ * instance to be configured
+ * @param jobConf
+ * configuration used for retrieving values
+ */
+ public static void configureParameters(String prefix, Parametered parametered, Configuration jobConf) {
+ parametered.createParameters(prefix, jobConf);
+ configureParametersRecursively(parametered, prefix, jobConf);
+ }
+
+ private static void configureParametersRecursively(Parametered parametered, String prefix, Configuration jobConf) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Configuring {}{}", prefix, parameter.name());
+ }
+ String name = prefix + parameter.name() + '.';
+ parameter.createParameters(name, jobConf);
+ parameter.configure(jobConf);
+ if (!parameter.getParameters().isEmpty()) {
+ configureParametersRecursively(parameter, name, jobConf);
+ }
+ }
+ }
+
+ public static String help(Parametered parametered) {
+ return new Help(parametered).toString();
+ }
+
+ public static String conf(Parametered parametered) {
+ return new Conf(parametered).toString();
+ }
+
+ private static final class Help {
+ static final int NAME_DESC_DISTANCE = 8;
+
+ private final StringBuilder sb;
+ private int longestName;
+ private int numChars = 100; // a few extra just to be sure
+
+ private Help(Parametered parametered) {
+ recurseCount(parametered);
+ numChars += (longestName + NAME_DESC_DISTANCE) * parametered.getParameters().size();
+ sb = new StringBuilder(numChars);
+ recurseWrite(parametered);
+ }
+
+ @Override
+ public String toString() {
+ return sb.toString();
+ }
+
+ private void recurseCount(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ int parameterNameLength = parameter.name().length();
+ if (parameterNameLength > longestName) {
+ longestName = parameterNameLength;
+ }
+ recurseCount(parameter);
+ numChars += parameter.description().length();
+ }
+ }
+
+ private void recurseWrite(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ sb.append(parameter.prefix());
+ sb.append(parameter.name());
+ int max = longestName - parameter.name().length() - parameter.prefix().length()
+ + NAME_DESC_DISTANCE;
+ for (int i = 0; i < max; i++) {
+ sb.append(' ');
+ }
+ sb.append(parameter.description());
+ if (parameter.defaultValue() != null) {
+ sb.append(" (default value '");
+ sb.append(parameter.defaultValue());
+ sb.append("')");
+ }
+ sb.append('\n');
+ recurseWrite(parameter);
+ }
+ }
+ }
+
+ private static final class Conf {
+ private final StringBuilder sb;
+ private int longestName;
+ private int numChars = 100; // a few extra just to be sure
+
+ private Conf(Parametered parametered) {
+ recurseCount(parametered);
+ sb = new StringBuilder(numChars);
+ recurseWrite(parametered);
+ }
+
+ @Override
+ public String toString() {
+ return sb.toString();
+ }
+
+ private void recurseCount(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ int parameterNameLength = parameter.prefix().length() + parameter.name().length();
+ if (parameterNameLength > longestName) {
+ longestName = parameterNameLength;
+ }
+
+ numChars += parameterNameLength;
+ numChars += 5; // # $0\n$1 = $2\n\n
+ numChars += parameter.description().length();
+ if (parameter.getStringValue() != null) {
+ numChars += parameter.getStringValue().length();
+ }
+
+ recurseCount(parameter);
+ }
+ }
+
+ private void recurseWrite(Parametered parametered) {
+ for (Parameter<?> parameter : parametered.getParameters()) {
+ sb.append("# ");
+ sb.append(parameter.description());
+ sb.append('\n');
+ sb.append(parameter.prefix());
+ sb.append(parameter.name());
+ sb.append(" = ");
+ if (parameter.getStringValue() != null) {
+ sb.append(parameter.getStringValue());
+ }
+ sb.append('\n');
+ sb.append('\n');
+ recurseWrite(parameter);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java b/mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
new file mode 100644
index 0000000..a617fe3
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/parameters/PathParameter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.parameters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class PathParameter extends AbstractParameter<Path> {
+
+ public PathParameter(String prefix, String name, Configuration jobConf, Path defaultValue, String description) {
+ super(Path.class, prefix, name, jobConf, defaultValue, description);
+ }
+
+ @Override
+ public void setStringValue(String stringValue) {
+ set(new Path(stringValue));
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java b/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java
new file mode 100644
index 0000000..1fd5506
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.driver;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.util.ProgramDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * General-purpose driver class for Mahout programs. Utilizes org.apache.hadoop.util.ProgramDriver to run
+ * main methods of other classes, but first loads up default properties from a properties file.
+ * <p/>
+ * To run locally:
+ *
+ * <pre>$MAHOUT_HOME/bin/mahout run shortJobName [over-ride ops]</pre>
+ * <p/>
+ * Works like this: by default, the file "driver.classes.props" is loaded from the classpath, which
+ * defines a mapping between short names like "vectordump" and fully qualified class names.
+ * The format of driver.classes.props is like so:
+ * <p/>
+ *
+ * <pre>fully.qualified.class.name = shortJobName : descriptive string</pre>
+ * <p/>
+ * The default properties to be applied to the program run is pulled out of, by default, "<shortJobName>.props"
+ * (also off of the classpath).
+ * <p/>
+ * The format of the default properties files is as follows:
+ * <pre>
+ i|input = /path/to/my/input
+ o|output = /path/to/my/output
+ m|jarFile = /path/to/jarFile
+ # etc - each line is shortArg|longArg = value
+ </pre>
+ *
+ * The next argument to the Driver is supposed to be the short name of the class to be run (as defined in the
+ * driver.classes.props file).
+ * <p/>
+ * Then the class which will be run will have it's main called with
+ *
+ * <pre>main(new String[] { "--input", "/path/to/my/input", "--output", "/path/to/my/output" });</pre>
+ *
+ * After all the "default" properties are loaded from the file, any further command-line arguments are taken in,
+ * and over-ride the defaults.
+ * <p/>
+ * So if your driver.classes.props looks like so:
+ *
+ * <pre>org.apache.mahout.utils.vectors.VectorDumper = vecDump : dump vectors from a sequence file</pre>
+ *
+ * and you have a file core/src/main/resources/vecDump.props which looks like
+ * <pre>
+ o|output = /tmp/vectorOut
+ s|seqFile = /my/vector/sequenceFile
+ </pre>
+ *
+ * And you execute the command-line:
+ *
+ * <pre>$MAHOUT_HOME/bin/mahout run vecDump -s /my/otherVector/sequenceFile</pre>
+ *
+ * Then org.apache.mahout.utils.vectors.VectorDumper.main() will be called with arguments:
+ * <pre>{"--output", "/tmp/vectorOut", "-s", "/my/otherVector/sequenceFile"}</pre>
+ */
+public final class MahoutDriver {
+
+ private static final Logger log = LoggerFactory.getLogger(MahoutDriver.class);
+
+ private MahoutDriver() {
+ }
+
+ public static void main(String[] args) throws Throwable {
+
+ Properties mainClasses = loadProperties("driver.classes.props");
+ if (mainClasses == null) {
+ mainClasses = loadProperties("driver.classes.default.props");
+ }
+ if (mainClasses == null) {
+ throw new IOException("Can't load any properties file?");
+ }
+
+ boolean foundShortName = false;
+ ProgramDriver programDriver = new ProgramDriver();
+ for (Object key : mainClasses.keySet()) {
+ String keyString = (String) key;
+ if (args.length > 0 && shortName(mainClasses.getProperty(keyString)).equals(args[0])) {
+ foundShortName = true;
+ }
+ if (args.length > 0 && keyString.equalsIgnoreCase(args[0]) && isDeprecated(mainClasses, keyString)) {
+ log.error(desc(mainClasses.getProperty(keyString)));
+ return;
+ }
+ if (isDeprecated(mainClasses, keyString)) {
+ continue;
+ }
+ addClass(programDriver, keyString, mainClasses.getProperty(keyString));
+ }
+
+ if (args.length < 1 || args[0] == null || "-h".equals(args[0]) || "--help".equals(args[0])) {
+ programDriver.driver(args);
+ return;
+ }
+
+ String progName = args[0];
+ if (!foundShortName) {
+ addClass(programDriver, progName, progName);
+ }
+ shift(args);
+
+ Properties mainProps = loadProperties(progName + ".props");
+ if (mainProps == null) {
+ log.warn("No {}.props found on classpath, will use command-line arguments only", progName);
+ mainProps = new Properties();
+ }
+
+ Map<String,String[]> argMap = Maps.newHashMap();
+ int i = 0;
+ while (i < args.length && args[i] != null) {
+ List<String> argValues = Lists.newArrayList();
+ String arg = args[i];
+ i++;
+ if (arg.startsWith("-D")) { // '-Dkey=value' or '-Dkey=value1,value2,etc' case
+ String[] argSplit = arg.split("=");
+ arg = argSplit[0];
+ if (argSplit.length == 2) {
+ argValues.add(argSplit[1]);
+ }
+ } else { // '-key [values]' or '--key [values]' case.
+ while (i < args.length && args[i] != null) {
+ if (args[i].startsWith("-")) {
+ break;
+ }
+ argValues.add(args[i]);
+ i++;
+ }
+ }
+ argMap.put(arg, argValues.toArray(new String[argValues.size()]));
+ }
+
+ // Add properties from the .props file that are not overridden on the command line
+ for (String key : mainProps.stringPropertyNames()) {
+ String[] argNamePair = key.split("\\|");
+ String shortArg = '-' + argNamePair[0].trim();
+ String longArg = argNamePair.length < 2 ? null : "--" + argNamePair[1].trim();
+ if (!argMap.containsKey(shortArg) && (longArg == null || !argMap.containsKey(longArg))) {
+ argMap.put(longArg, new String[] {mainProps.getProperty(key)});
+ }
+ }
+
+ // Now add command-line args
+ List<String> argsList = Lists.newArrayList();
+ argsList.add(progName);
+ for (Map.Entry<String,String[]> entry : argMap.entrySet()) {
+ String arg = entry.getKey();
+ if (arg.startsWith("-D")) { // arg is -Dkey - if value for this !isEmpty(), then arg -> -Dkey + "=" + value
+ String[] argValues = entry.getValue();
+ if (argValues.length > 0 && !argValues[0].trim().isEmpty()) {
+ arg += '=' + argValues[0].trim();
+ }
+ argsList.add(1, arg);
+ } else {
+ argsList.add(arg);
+ for (String argValue : Arrays.asList(argMap.get(arg))) {
+ if (!argValue.isEmpty()) {
+ argsList.add(argValue);
+ }
+ }
+ }
+ }
+
+ long start = System.currentTimeMillis();
+
+ programDriver.driver(argsList.toArray(new String[argsList.size()]));
+
+ if (log.isInfoEnabled()) {
+ log.info("Program took {} ms (Minutes: {})", System.currentTimeMillis() - start,
+ (System.currentTimeMillis() - start) / 60000.0);
+ }
+ }
+
+ private static boolean isDeprecated(Properties mainClasses, String keyString) {
+ return "deprecated".equalsIgnoreCase(shortName(mainClasses.getProperty(keyString)));
+ }
+
+ private static Properties loadProperties(String resource) throws IOException {
+ InputStream propsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(resource);
+ if (propsStream != null) {
+ try {
+ Properties properties = new Properties();
+ properties.load(propsStream);
+ return properties;
+ } finally {
+ Closeables.close(propsStream, true);
+ }
+ }
+ return null;
+ }
+
+ private static String[] shift(String[] args) {
+ System.arraycopy(args, 1, args, 0, args.length - 1);
+ args[args.length - 1] = null;
+ return args;
+ }
+
+ private static String shortName(String valueString) {
+ return valueString.contains(":") ? valueString.substring(0, valueString.indexOf(':')).trim() : valueString;
+ }
+
+ private static String desc(String valueString) {
+ return valueString.contains(":") ? valueString.substring(valueString.indexOf(':')).trim() : valueString;
+ }
+
+ private static void addClass(ProgramDriver driver, String classString, String descString) {
+ try {
+ Class<?> clazz = Class.forName(classString);
+ driver.addClass(shortName(descString), clazz, desc(descString));
+ } catch (Throwable t) {
+ log.warn("Unable to add class: {}", classString, t);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java b/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java
new file mode 100644
index 0000000..b744287
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.ep;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.sgd.PolymorphicWritable;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Allows evolutionary optimization where the state function can't be easily
+ * packaged for the optimizer to execute. A good example of this is with
+ * on-line learning where optimizing the learning parameters is desirable.
+ * We would like to pass training examples to the learning algorithms, but
+ * we definitely want to do the training in multiple threads and then after
+ * several training steps, we want to do a selection and mutation step.
+ *
+ * In such a case, it is highly desirable to leave most of the control flow
+ * in the hands of our caller. As such, this class provides three functions,
+ * <ul>
+ * <li> Storage of the evolutionary state. The state variables have payloads
+ * which can be anything that implements Payload.
+ * <li> Threaded execution of a single operation on each of the members of the
+ * population being evolved. In the on-line learning example, this is used for
+ * training all of the classifiers in the population.
+ * <li> Propagating mutations of the most successful members of the population.
+ * This propagation involves copying the state and the payload and then updating
+ * the payload after mutation of the evolutionary state.
+ * </ul>
+ *
+ * The State class that we use for storing the state of each member of the
+ * population also provides parameter mapping. Check out Mapping and State
+ * for more info.
+ *
+ * @see Mapping
+ * @see Payload
+ * @see State
+ *
+ * @param <T> The payload class.
+ */
+public class EvolutionaryProcess<T extends Payload<U>, U> implements Writable, Closeable {
+ // used to execute operations on the population in thread parallel.
+ private ExecutorService pool;
+
+ // threadCount is serialized so that we can reconstruct the thread pool
+ private int threadCount;
+
+ // list of members of the population
+ private List<State<T, U>> population;
+
+ // how big should the population be. If this is changed, it will take effect
+ // the next time the population is mutated.
+
+ private int populationSize;
+
+ public EvolutionaryProcess() {
+ population = Lists.newArrayList();
+ }
+
+ /**
+ * Creates an evolutionary optimization framework with specified threadiness,
+ * population size and initial state.
+ * @param threadCount How many threads to use in parallelDo
+ * @param populationSize How large a population to use
+ * @param seed An initial population member
+ */
+ public EvolutionaryProcess(int threadCount, int populationSize, State<T, U> seed) {
+ this.populationSize = populationSize;
+ setThreadCount(threadCount);
+ initializePopulation(populationSize, seed);
+ }
+
+ private void initializePopulation(int populationSize, State<T, U> seed) {
+ population = Lists.newArrayList(seed);
+ for (int i = 0; i < populationSize; i++) {
+ population.add(seed.mutate());
+ }
+ }
+
+ public void add(State<T, U> value) {
+ population.add(value);
+ }
+
+ /**
+ * Nuke all but a few of the current population and then repopulate with
+ * variants of the survivors.
+ * @param survivors How many survivors we want to keep.
+ */
+ public void mutatePopulation(int survivors) {
+ // largest value first, oldest first in case of ties
+ Collections.sort(population);
+
+ // we copy here to avoid concurrent modification
+ List<State<T, U>> parents = Lists.newArrayList(population.subList(0, survivors));
+ population.subList(survivors, population.size()).clear();
+
+ // fill out the population with offspring from the survivors
+ int i = 0;
+ while (population.size() < populationSize) {
+ population.add(parents.get(i % survivors).mutate());
+ i++;
+ }
+ }
+
+ /**
+ * Execute an operation on all of the members of the population with many threads. The
+ * return value is taken as the current fitness of the corresponding member.
+ * @param fn What to do on each member. Gets payload and the mapped parameters as args.
+ * @return The member of the population with the best fitness.
+ * @throws InterruptedException Shouldn't happen.
+ * @throws ExecutionException If fn throws an exception, that exception will be collected
+ * and rethrown nested in an ExecutionException.
+ */
+ public State<T, U> parallelDo(final Function<Payload<U>> fn) throws InterruptedException, ExecutionException {
+ Collection<Callable<State<T, U>>> tasks = Lists.newArrayList();
+ for (final State<T, U> state : population) {
+ tasks.add(new Callable<State<T, U>>() {
+ @Override
+ public State<T, U> call() {
+ double v = fn.apply(state.getPayload(), state.getMappedParams());
+ state.setValue(v);
+ return state;
+ }
+ });
+ }
+
+ List<Future<State<T, U>>> r = pool.invokeAll(tasks);
+
+ // zip through the results and find the best one
+ double max = Double.NEGATIVE_INFINITY;
+ State<T, U> best = null;
+ for (Future<State<T, U>> future : r) {
+ State<T, U> s = future.get();
+ double value = s.getValue();
+ if (!Double.isNaN(value) && value >= max) {
+ max = value;
+ best = s;
+ }
+ }
+ if (best == null) {
+ best = r.get(0).get();
+ }
+
+ return best;
+ }
+
+ public void setThreadCount(int threadCount) {
+ this.threadCount = threadCount;
+ pool = Executors.newFixedThreadPool(threadCount);
+ }
+
+ public int getThreadCount() {
+ return threadCount;
+ }
+
+ public int getPopulationSize() {
+ return populationSize;
+ }
+
+ public List<State<T, U>> getPopulation() {
+ return population;
+ }
+
+ @Override
+ public void close() {
+ List<Runnable> remainingTasks = pool.shutdownNow();
+ try {
+ pool.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Had to forcefully shut down " + remainingTasks.size() + " tasks");
+ }
+ if (!remainingTasks.isEmpty()) {
+ throw new IllegalStateException("Had to forcefully shut down " + remainingTasks.size() + " tasks");
+ }
+ }
+
+ public interface Function<T> {
+ double apply(T payload, double[] params);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(threadCount);
+ out.writeInt(population.size());
+ for (State<T, U> state : population) {
+ PolymorphicWritable.write(out, state);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ setThreadCount(input.readInt());
+ int n = input.readInt();
+ population = Lists.newArrayList();
+ for (int i = 0; i < n; i++) {
+ State<T, U> state = (State<T, U>) PolymorphicWritable.read(input, State.class);
+ population.add(state);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/ep/Mapping.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/ep/Mapping.java b/mr/src/main/java/org/apache/mahout/ep/Mapping.java
new file mode 100644
index 0000000..41a8942
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/ep/Mapping.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.ep;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.sgd.PolymorphicWritable;
+import org.apache.mahout.math.function.DoubleFunction;
+
+/**
+ * Provides coordinate tranformations so that evolution can proceed on the entire space of
+ * reals but have the output limited and squished in convenient (and safe) ways.
+ */
+public abstract class Mapping extends DoubleFunction implements Writable {
+
+ private Mapping() {
+ }
+
+ public static final class SoftLimit extends Mapping {
+ private double min;
+ private double max;
+ private double scale;
+
+ public SoftLimit() {
+ }
+
+ private SoftLimit(double min, double max, double scale) {
+ this.min = min;
+ this.max = max;
+ this.scale = scale;
+ }
+
+ @Override
+ public double apply(double v) {
+ return min + (max - min) * 1 / (1 + Math.exp(-v * scale));
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeDouble(min);
+ out.writeDouble(max);
+ out.writeDouble(scale);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ min = in.readDouble();
+ max = in.readDouble();
+ scale = in.readDouble();
+ }
+ }
+
+ public static final class LogLimit extends Mapping {
+ private Mapping wrapped;
+
+ public LogLimit() {
+ }
+
+ private LogLimit(double low, double high) {
+ wrapped = softLimit(Math.log(low), Math.log(high));
+ }
+
+ @Override
+ public double apply(double v) {
+ return Math.exp(wrapped.apply(v));
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ PolymorphicWritable.write(dataOutput, wrapped);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ wrapped = PolymorphicWritable.read(in, Mapping.class);
+ }
+ }
+
+ public static final class Exponential extends Mapping {
+ private double scale;
+
+ public Exponential() {
+ }
+
+ private Exponential(double scale) {
+ this.scale = scale;
+ }
+
+ @Override
+ public double apply(double v) {
+ return Math.exp(v * scale);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeDouble(scale);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ scale = in.readDouble();
+ }
+ }
+
+ public static final class Identity extends Mapping {
+ @Override
+ public double apply(double v) {
+ return v;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) {
+ // stateless
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) {
+ // stateless
+ }
+ }
+
+ /**
+ * Maps input to the open interval (min, max) with 0 going to the mean of min and
+ * max. When scale is large, a larger proportion of values are mapped to points
+ * near the boundaries. When scale is small, a larger proportion of values are mapped to
+ * points well within the boundaries.
+ * @param min The largest lower bound on values to be returned.
+ * @param max The least upper bound on values to be returned.
+ * @param scale Defines how sharp the boundaries are.
+ * @return A mapping that satisfies the desired constraint.
+ */
+ public static Mapping softLimit(double min, double max, double scale) {
+ return new SoftLimit(min, max, scale);
+ }
+
+ /**
+ * Maps input to the open interval (min, max) with 0 going to the mean of min and
+ * max. When scale is large, a larger proportion of values are mapped to points
+ * near the boundaries.
+ * @see #softLimit(double, double, double)
+ * @param min The largest lower bound on values to be returned.
+ * @param max The least upper bound on values to be returned.
+ * @return A mapping that satisfies the desired constraint.
+ */
+ public static Mapping softLimit(double min, double max) {
+ return softLimit(min, max, 1);
+ }
+
+ /**
+ * Maps input to positive values in the open interval (min, max) with
+ * 0 going to the geometric mean. Near the geometric mean, values are
+ * distributed roughly geometrically.
+ * @param low The largest lower bound for output results. Must be >0.
+ * @param high The least upper bound for output results. Must be >0.
+ * @return A mapped value.
+ */
+ public static Mapping logLimit(double low, double high) {
+ Preconditions.checkArgument(low > 0, "Lower bound for log limit must be > 0 but was %f", low);
+ Preconditions.checkArgument(high > 0, "Upper bound for log limit must be > 0 but was %f", high);
+ return new LogLimit(low, high);
+ }
+
+ /**
+ * Maps results to positive values.
+ * @return A positive value.
+ */
+ public static Mapping exponential() {
+ return exponential(1);
+ }
+
+ /**
+ * Maps results to positive values.
+ * @param scale If large, then large values are more likely.
+ * @return A positive value.
+ */
+ public static Mapping exponential(double scale) {
+ return new Exponential(scale);
+ }
+
+ /**
+ * Maps results to themselves.
+ * @return The original value.
+ */
+ public static Mapping identity() {
+ return new Identity();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/ep/Payload.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/ep/Payload.java b/mr/src/main/java/org/apache/mahout/ep/Payload.java
new file mode 100644
index 0000000..920237d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/ep/Payload.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.ep;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Payloads for evolutionary state must be copyable and updatable. The copy should be a deep copy
+ * unless some aspect of the state is sharable or immutable.
+ * <p/>
+ * During mutation, a copy is first made and then after the parameters in the State structure are
+ * suitably modified, update is called with the scaled versions of the parameters.
+ *
+ * @param <T>
+ * @see State
+ */
+public interface Payload<T> extends Writable {
+ Payload<T> copy();
+
+ void update(double[] params);
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/ep/State.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/ep/State.java b/mr/src/main/java/org/apache/mahout/ep/State.java
new file mode 100644
index 0000000..7a0fb5e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/ep/State.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.ep;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.sgd.PolymorphicWritable;
+import org.apache.mahout.common.RandomUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Records evolutionary state and provides a mutation operation for recorded-step meta-mutation.
+ *
+ * You provide the payload, this class provides the mutation operations. During mutation,
+ * the payload is copied and after the state variables are changed, they are passed to the
+ * payload.
+ *
+ * Parameters are internally mutated in a state space that spans all of R^n, but parameters
+ * passed to the payload are transformed as specified by a call to setMap(). The default
+ * mapping is the identity map, but uniform-ish or exponential-ish coverage of a range are
+ * also supported.
+ *
+ * More information on the underlying algorithm can be found in the following paper
+ *
+ * http://arxiv.org/abs/0803.3838
+ *
+ * @see Mapping
+ */
+public class State<T extends Payload<U>, U> implements Comparable<State<T, U>>, Writable {
+
+ // object count is kept to break ties in comparison.
+ private static final AtomicInteger OBJECT_COUNT = new AtomicInteger();
+
+ private int id = OBJECT_COUNT.getAndIncrement();
+ private Random gen = RandomUtils.getRandom();
+ // current state
+ private double[] params;
+ // mappers to transform state
+ private Mapping[] maps;
+ // omni-directional mutation
+ private double omni;
+ // directional mutation
+ private double[] step;
+ // current fitness value
+ private double value;
+ private T payload;
+
+ public State() {
+ }
+
+ /**
+ * Invent a new state with no momentum (yet).
+ */
+ public State(double[] x0, double omni) {
+ params = Arrays.copyOf(x0, x0.length);
+ this.omni = omni;
+ step = new double[params.length];
+ maps = new Mapping[params.length];
+ }
+
+ /**
+ * Deep copies a state, useful in mutation.
+ */
+ public State<T, U> copy() {
+ State<T, U> r = new State<>();
+ r.params = Arrays.copyOf(this.params, this.params.length);
+ r.omni = this.omni;
+ r.step = Arrays.copyOf(this.step, this.step.length);
+ r.maps = Arrays.copyOf(this.maps, this.maps.length);
+ if (this.payload != null) {
+ r.payload = (T) this.payload.copy();
+ }
+ r.gen = this.gen;
+ return r;
+ }
+
+ /**
+ * Clones this state with a random change in position. Copies the payload and
+ * lets it know about the change.
+ *
+ * @return A new state.
+ */
+ public State<T, U> mutate() {
+ double sum = 0;
+ for (double v : step) {
+ sum += v * v;
+ }
+ sum = Math.sqrt(sum);
+ double lambda = 1 + gen.nextGaussian();
+
+ State<T, U> r = this.copy();
+ double magnitude = 0.9 * omni + sum / 10;
+ r.omni = magnitude * -Math.log1p(-gen.nextDouble());
+ for (int i = 0; i < step.length; i++) {
+ r.step[i] = lambda * step[i] + r.omni * gen.nextGaussian();
+ r.params[i] += r.step[i];
+ }
+ if (this.payload != null) {
+ r.payload.update(r.getMappedParams());
+ }
+ return r;
+ }
+
+ /**
+ * Defines the transformation for a parameter.
+ * @param i Which parameter's mapping to define.
+ * @param m The mapping to use.
+ * @see org.apache.mahout.ep.Mapping
+ */
+ public void setMap(int i, Mapping m) {
+ maps[i] = m;
+ }
+
+ /**
+ * Returns a transformed parameter.
+ * @param i The parameter to return.
+ * @return The value of the parameter.
+ */
+ public double get(int i) {
+ Mapping m = maps[i];
+ return m == null ? params[i] : m.apply(params[i]);
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public double[] getParams() {
+ return params;
+ }
+
+ public Mapping[] getMaps() {
+ return maps;
+ }
+
+ /**
+ * Returns all the parameters in mapped form.
+ * @return An array of parameters.
+ */
+ public double[] getMappedParams() {
+ double[] r = Arrays.copyOf(params, params.length);
+ for (int i = 0; i < params.length; i++) {
+ r[i] = get(i);
+ }
+ return r;
+ }
+
+ public double getOmni() {
+ return omni;
+ }
+
+ public double[] getStep() {
+ return step;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+
+ public double getValue() {
+ return value;
+ }
+
+ public void setOmni(double omni) {
+ this.omni = omni;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public void setStep(double[] step) {
+ this.step = step;
+ }
+
+ public void setMaps(Mapping[] maps) {
+ this.maps = maps;
+ }
+
+ public void setMaps(Iterable<Mapping> maps) {
+ Collection<Mapping> list = Lists.newArrayList(maps);
+ this.maps = list.toArray(new Mapping[list.size()]);
+ }
+
+ public void setValue(double v) {
+ value = v;
+ }
+
+ public void setPayload(T payload) {
+ this.payload = payload;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof State)) {
+ return false;
+ }
+ State<?,?> other = (State<?,?>) o;
+ return id == other.id && value == other.value;
+ }
+
+ @Override
+ public int hashCode() {
+ return RandomUtils.hashDouble(value) ^ id;
+ }
+
+ /**
+ * Natural order is to sort in descending order of score. Creation order is used as a
+ * tie-breaker.
+ *
+ * @param other The state to compare with.
+ * @return -1, 0, 1 if the other state is better, identical or worse than this one.
+ */
+ @Override
+ public int compareTo(State<T, U> other) {
+ int r = Double.compare(other.value, this.value);
+ if (r != 0) {
+ return r;
+ }
+ if (this.id < other.id) {
+ return -1;
+ }
+ if (this.id > other.id) {
+ return 1;
+ }
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ double sum = 0;
+ for (double v : step) {
+ sum += v * v;
+ }
+ return String.format(Locale.ENGLISH, "<S/%s %.3f %.3f>", payload, omni + Math.sqrt(sum), value);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(id);
+ out.writeInt(params.length);
+ for (double v : params) {
+ out.writeDouble(v);
+ }
+ for (Mapping map : maps) {
+ PolymorphicWritable.write(out, map);
+ }
+
+ out.writeDouble(omni);
+ for (double v : step) {
+ out.writeDouble(v);
+ }
+
+ out.writeDouble(value);
+ PolymorphicWritable.write(out, payload);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readInt();
+ int n = input.readInt();
+ params = new double[n];
+ for (int i = 0; i < n; i++) {
+ params[i] = input.readDouble();
+ }
+
+ maps = new Mapping[n];
+ for (int i = 0; i < n; i++) {
+ maps[i] = PolymorphicWritable.read(input, Mapping.class);
+ }
+ omni = input.readDouble();
+ step = new double[n];
+ for (int i = 0; i < n; i++) {
+ step[i] = input.readDouble();
+ }
+ value = input.readDouble();
+ payload = (T) PolymorphicWritable.read(input, Payload.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/ep/package-info.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/ep/package-info.java b/mr/src/main/java/org/apache/mahout/ep/package-info.java
new file mode 100644
index 0000000..4afe677
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/ep/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * <p>Provides basic evolutionary optimization using <a href="http://arxiv.org/abs/0803.3838">recorded-step</a>
+ * mutation.</p>
+ *
+ * <p>With this style of optimization, we can optimize a function {@code f: R^n -> R} by stochastic
+ * hill-climbing with some of the benefits of conjugate gradient style history encoded in the mutation function.
+ * This mutation function will adapt to allow weakly directed search rather than using the somewhat more
+ * conventional symmetric Gaussian.</p>
+ *
+ * <p>With recorded-step mutation, the meta-mutation parameters are all auto-encoded in the current state of each point.
+ * This avoids the classic problem of having more mutation rate parameters than are in the original state and then
+ * requiring even more parameters to describe the meta-mutation rate. Instead, we store the previous point and one
+ * omni-directional mutation component. Mutation is performed by first mutating along the line formed by the previous
+ * and current points and then adding a scaled symmetric Gaussian. The magnitude of the omni-directional mutation is
+ * then mutated using itself as a scale.</p>
+ *
+ * <p>Because it is convenient to not restrict the parameter space, this package also provides convenient parameter
+ * mapping methods. These mapping methods map the set of reals to a finite open interval (a,b) in such a way that
+ * {@code lim_{x->-\inf} f(x) = a} and {@code lim_{x->\inf} f(x) = b}. The linear mapping is defined so that
+ * {@code f(0) = (a+b)/2} and the exponential mapping requires that a and b are both positive and has
+ * {@code f(0) = sqrt(ab)}. The linear mapping is useful for values that must stay roughly within a range but
+ * which are roughly uniform within the center of that range. The exponential
+ * mapping is useful for values that must stay within a range but whose distribution is roughly exponential near
+ * geometric mean of the end-points. An identity mapping is also supplied.</p>
+ */
+package org.apache.mahout.ep;
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/DistributedRowMatrixWriter.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/math/DistributedRowMatrixWriter.java b/mr/src/main/java/org/apache/mahout/math/DistributedRowMatrixWriter.java
new file mode 100644
index 0000000..6618a1a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/DistributedRowMatrixWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+import java.io.IOException;
+
+public final class DistributedRowMatrixWriter {
+
+ private DistributedRowMatrixWriter() {
+ }
+
+ public static void write(Path outputDir, Configuration conf, Iterable<MatrixSlice> matrix) throws IOException {
+ FileSystem fs = outputDir.getFileSystem(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outputDir,
+ IntWritable.class, VectorWritable.class);
+ IntWritable topic = new IntWritable();
+ VectorWritable vector = new VectorWritable();
+ for (MatrixSlice slice : matrix) {
+ topic.set(slice.index());
+ vector.set(slice.vector());
+ writer.append(topic, vector);
+ }
+ writer.close();
+
+ }
+
+}