You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2011/03/07 07:34:13 UTC
svn commit: r1078694 [1/2] - in /mahout/trunk: core/
core/src/main/java/org/apache/mahout/common/
core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/
core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/ math/
math/src/main/java/o...
Author: dlyubimov
Date: Mon Mar 7 06:34:12 2011
New Revision: 1078694
URL: http://svn.apache.org/viewvc?rev=1078694&view=rev
Log:
MAHOUT-593: initial addition of Stochastic SVD method (related issue is MAHOUT-376)
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UpperTriangular.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java
mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/
mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/EigenSolverWrapper.java
mahout/trunk/src/conf/ssvd.props
Modified:
mahout/trunk/core/pom.xml
mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java
mahout/trunk/math/pom.xml
mahout/trunk/src/conf/driver.classes.props
Modified: mahout/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/mahout/trunk/core/pom.xml?rev=1078694&r1=1078693&r2=1078694&view=diff
==============================================================================
--- mahout/trunk/core/pom.xml (original)
+++ mahout/trunk/core/pom.xml Mon Mar 7 06:34:12 2011
@@ -212,9 +212,11 @@
</dependency>
<dependency>
- <groupId>commons-math</groupId>
+<!-- <groupId>commons-math</groupId>-->
+ <groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
- <version>1.2</version>
+ <!-- version>1.2</version -->
+ <version>2.1</version>
</dependency>
<dependency>
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java?rev=1078694&r1=1078693&r2=1078694&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java Mon Mar 7 06:34:12 2011
@@ -18,11 +18,13 @@
package org.apache.mahout.common;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -101,4 +103,92 @@ public final class IOUtils {
quietClose(connection);
}
+ /**
+ * make sure to close all sources, log all of the problems occurred, clear
+ * <code>closeables</code> (to prevent repeating close attempts), re-throw the
+ * last one at the end. Helps resource scope management (e.g. compositions of
+ * {@link Closeable}s objects)
+ * <P>
+ * <p/>
+ * Typical pattern:
+ * <p/>
+ *
+ * <pre>
+ * LinkedList<Closeable> closeables = new LinkedList<Closeable>();
+ * try {
+ * InputStream stream1 = new FileInputStream(...);
+ * closeables.addFirst(stream1);
+ * ...
+ * InputStream streamN = new FileInputStream(...);
+ * closeables.addFirst(streamN);
+ * ...
+ * } finally {
+ * IOUtils.close(closeables);
+ * }
+ * </pre>
+ *
+ * @param closeables
+ * must be a modifiable collection of {@link Closeable}s
+ * @throws IOException
+ * the last exception (if any) of all closed resources
+ *
+ *
+ */
+ public static void close(Collection<? extends Closeable> closeables)
+ throws IOException {
+ Throwable lastThr = null;
+
+ for (Closeable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (Throwable thr) {
+ log.error(thr.getMessage(), thr);
+ lastThr = thr;
+ }
+ }
+
+ // make sure we don't double-close
+ // but that has to be modifiable collection
+ closeables.clear();
+
+ if (lastThr != null) {
+ if (lastThr instanceof IOException) {
+ throw (IOException) lastThr;
+ } else if (lastThr instanceof RuntimeException) {
+ throw (RuntimeException) lastThr;
+ } else if (lastThr instanceof Error) {
+ throw (Error) lastThr;
+ } else {
+ // should not happen
+ throw (IOException) new IOException("Unexpected exception during close")
+ .initCause(lastThr);
+ }
+ }
+
+ }
+
+
+ /**
+ * for temporary files, a file may be considered as a {@link Closeable} too,
+ * where file is wiped on close and thus the disk resource is released
+ * ('closed').
+ *
+ *
+ */
+ public static class DeleteFileOnClose implements Closeable {
+
+ private File file;
+
+ public DeleteFileOnClose(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (file.isFile())
+ file.delete();
+ }
+
+ }
+
}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Compute B*Bt using simple fact that B*Bt = sum(outer prod ( B_(*i), (B_(*i))
+ * ).
+ *
+ */
+public class BBtJob {
+
+ public static final String OUTPUT_BBT = "part";
+
+ public static void run(Configuration conf, Path btPath, Path outputPath,
+ int numReduceTasks) throws IOException, ClassNotFoundException,
+ InterruptedException {
+
+ Job job = new Job(conf);
+ job.setJobName("BBt-job");
+ job.setJarByClass(BBtJob.class);
+
+ // input
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, btPath);
+
+ // map
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(VectorWritable.class);
+ job.setMapperClass(BBtMapper.class);
+ job.setReducerClass(BBtReducer.class);
+
+ // combiner and reducer
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+
+ // output
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ SequenceFileOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+ SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+ job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BBT);
+
+ // run
+ job.submit();
+ job.waitForCompletion(false);
+ if (!job.isSuccessful())
+ throw new IOException("BBt job failed.");
+ return;
+
+ }
+
+ // actually, B*Bt matrix is small enough so that we don't need to rely on
+ // combiner.
+ public static class BBtMapper extends
+ Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ private VectorWritable vw = new VectorWritable();
+ private IntWritable iw = new IntWritable();
+ private UpperTriangular bbtPartial; // are all partial BBt products
+ // symmetrical as well? yes.
+
+ @Override
+ protected void map(IntWritable key, VectorWritable value, Context context)
+ throws IOException, InterruptedException {
+ Vector btVec = value.get();
+ int kp = btVec.size();
+ if (bbtPartial == null) {
+ bbtPartial = new UpperTriangular(kp);
+ }
+ for (int i = 0; i < kp; i++) {
+ // this approach should reduce GC churn rate
+ double mul = btVec.getQuick(i);
+ for (int j = i; j < kp; j++)
+ bbtPartial.setQuick(i, j,
+ bbtPartial.getQuick(i, j) + mul * btVec.getQuick(j));
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ if (bbtPartial != null) {
+ iw.set(context.getTaskAttemptID().getTaskID().getId());
+ vw.set(new DenseVector(bbtPartial.getData(), true));
+ context.write(iw, vw);
+ }
+ super.cleanup(context);
+ }
+ }
+
+ public static class BBtReducer extends
+ Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ private double[] accum;
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ try {
+ if (accum != null)
+ context.write(new IntWritable(), new VectorWritable(new DenseVector(
+ accum, true)));
+ } finally {
+ super.cleanup(context);
+ }
+ }
+
+ @Override
+ protected void reduce(IntWritable iw, Iterable<VectorWritable> ivw,
+ Context ctx) throws IOException, InterruptedException {
+ Iterator<VectorWritable> vwIter = ivw.iterator();
+ Vector bbtPartial = vwIter.next().get();
+ if (accum == null) {
+ accum = new double[bbtPartial.size()];
+ }
+ do {
+ for (int i = 0; i < accum.length; i++)
+ accum[i] += bbtPartial.getQuick(i);
+ } while (vwIter.hasNext() && null != (bbtPartial = vwIter.next().get()));
+ }
+
+ }
+
+ // naive mapper.
+ // public static class BBtMapper extends
+ // Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+ //
+ // private VectorWritable m_vw = new VectorWritable();
+ // private IntWritable m_iw = new IntWritable();
+ // private DenseVector m_v;
+ // double[] m_vRow;
+ //
+ // @Override
+ // protected void map(IntWritable key, VectorWritable value,
+ // Context context) throws IOException, InterruptedException {
+ // Vector btVec = value.get();
+ // int kp = btVec.size();
+ // if (m_v == null) {
+ // m_v = new DenseVector(m_vRow = new double[kp], true);
+ // m_vw.set(m_v);
+ // }
+ // for (int i = 0; i < kp; i++) {
+ // // this approach should reduce GC churn rate
+ // double mul = btVec.getQuick(i);
+ // for (int j = 0; j < kp; j++)
+ // m_vRow[j] = mul * btVec.getQuick(j);
+ // m_iw.set(i);
+ // context.write(m_iw, m_vw);
+ // }
+ // }
+ // }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.stochasticsvd.QJob.QJobKeyWritable;
+
+/**
+ * Bt job. For details, see working notes in MAHOUT-376.
+ *
+ */
+@SuppressWarnings("deprecation")
+public class BtJob {
+
+ public static final String OUTPUT_Q = "Q";
+ public static final String OUTPUT_BT = "part";
+ public static final String PROP_QJOB_PATH = "ssvd.QJob.path";
+
+ public static class BtMapper extends
+ Mapper<Writable, VectorWritable, IntWritable, VectorWritable> {
+
+ private SequenceFile.Reader qInput;
+ private List<UpperTriangular> mRs = new ArrayList<UpperTriangular>();
+ private int blockNum;
+ private double[][] mQt;
+ private int cnt;
+ private int r;
+ private MultipleOutputs outputs;
+ private IntWritable btKey = new IntWritable();
+ private VectorWritable btValue = new VectorWritable();
+ private int kp;
+ private VectorWritable qRowValue = new VectorWritable();
+ private int qCount; // debug
+
+ void loadNextQt(Context ctx) throws IOException, InterruptedException {
+ QJobKeyWritable key = new QJobKeyWritable();
+ DenseBlockWritable v = new DenseBlockWritable();
+
+ boolean more = qInput.next(key, v);
+ assert more;
+
+ mQt = GivensThinSolver.computeQtHat(v.getBlock(), blockNum == 0 ? 0
+ : 1, new GivensThinSolver.DeepCopyUTIterator(mRs.iterator()));
+ r = mQt[0].length;
+ kp = mQt.length;
+ if (btValue.get() == null)
+ btValue.set(new DenseVector(kp));
+ if (qRowValue.get() == null)
+ qRowValue.set(new DenseVector(kp));
+
+ // also output QHat -- although we don't know the A labels there. Is it
+ // important?
+ // DenseVector qRow = new DenseVector(m_kp);
+ // IntWritable oKey = new IntWritable();
+ // VectorWritable oV = new VectorWritable();
+ //
+ // for ( int i = m_r-1; i>=0; i-- ) {
+ // for ( int j= 0; j < m_kp; j++ )
+ // qRow.setQuick(j, m_qt[j][i]);
+ // oKey.set((m_blockNum<<20)+m_r-i-1);
+ // oV.set(qRow);
+ // // so the block #s range is thus 0..2048, and number of rows per block
+ // is 0..2^20.
+ // // since we are not really sending it out to sort (it is a 'side
+ // file'),
+ // // it doesn't matter if it overflows.
+ // m_outputs.write( OUTPUT_Q, oKey, oV);
+ // }
+ qCount++;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+
+ if (qInput != null)
+ qInput.close();
+ if (outputs != null)
+ outputs.close();
+ super.cleanup(context);
+ }
+
+ @Override
+ @SuppressWarnings ({"unchecked"})
+ protected void map(Writable key, VectorWritable value, Context context)
+ throws IOException, InterruptedException {
+ if (mQt != null && cnt++ == r)
+ mQt = null;
+ if (mQt == null) {
+ loadNextQt(context);
+ cnt = 1;
+ }
+
+ // output Bt outer products
+ Vector aRow = value.get();
+ int qRowIndex = r - cnt; // because QHats are initially stored in
+ // reverse
+ Vector qRow = qRowValue.get();
+ for (int j = 0; j < kp; j++)
+ qRow.setQuick(j, mQt[j][qRowIndex]);
+
+ outputs.getCollector(OUTPUT_Q, null).collect(key, qRowValue); // make
+ // sure
+ // Qs
+ // are
+ // inheriting
+ // A row
+ // labels.
+
+ int n = aRow.size();
+ Vector btRow = btValue.get();
+ for (int i = 0; i < n; i++) {
+ double mul = aRow.getQuick(i);
+ for (int j = 0; j < kp; j++)
+ btRow.setQuick(j, mul * qRow.getQuick(j));
+ btKey.set(i);
+ context.write(btKey, btValue);
+ }
+
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ super.setup(context);
+
+ Path qJobPath = new Path(context.getConfiguration().get(PROP_QJOB_PATH));
+
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ // actually this is kind of dangerous
+ // becuase this routine thinks we need to create file name for
+ // our current job and this will use -m- so it's just serendipity we are
+ // calling
+ // it from the mapper too as the QJob did.
+ Path qInputPath = new Path(qJobPath, FileOutputFormat.getUniqueFile(
+ context, QJob.OUTPUT_QHAT, ""));
+ qInput = new SequenceFile.Reader(fs, qInputPath,
+ context.getConfiguration());
+
+ blockNum = context.getTaskAttemptID().getTaskID().getId();
+
+ // read all r files _in order of task ids_, i.e. partitions
+ Path rPath = new Path(qJobPath, QJob.OUTPUT_R + "-*");
+ FileStatus[] rFiles = fs.globStatus(rPath);
+
+ if (rFiles == null)
+ throw new IOException("Can't find R inputs ");
+
+ Arrays.sort(rFiles, SSVDSolver.partitionComparator);
+
+ QJobKeyWritable rKey = new QJobKeyWritable();
+ VectorWritable rValue = new VectorWritable();
+
+ int block = 0;
+ for (FileStatus fstat : rFiles) {
+ SequenceFile.Reader rReader = new SequenceFile.Reader(fs,
+ fstat.getPath(), context.getConfiguration());
+ try {
+ rReader.next(rKey, rValue);
+ } finally {
+ rReader.close();
+ }
+ if (block < blockNum && block > 0)
+ GivensThinSolver.mergeR(mRs.get(0),
+ new UpperTriangular(rValue.get()));
+ else
+ mRs.add(new UpperTriangular(rValue.get()));
+ block++;
+ }
+ outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
+ }
+ }
+
+ public static class OuterProductReducer extends
+ Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+ private VectorWritable oValue = new VectorWritable();
+ private DenseVector accum;
+
+ @Override
+ protected void reduce(IntWritable key, Iterable<VectorWritable> values,
+ Context ctx) throws IOException, InterruptedException {
+ Iterator<VectorWritable> vwIter = values.iterator();
+
+ Vector vec = vwIter.next().get();
+ if (accum == null || accum.size() != vec.size()) {
+ accum = new DenseVector(vec);
+ oValue.set(accum);
+ } else
+ accum.assign(vec);
+
+ while (vwIter.hasNext())
+ accum.addAll(vwIter.next().get());
+ ctx.write(key, oValue);
+ }
+
+ }
+
+ public static void run(Configuration conf, Path inputPathA[],
+ Path inputPathQJob, Path outputPath, int minSplitSize, int k, int p,
+ int numReduceTasks, Class<? extends Writable> labelClass)
+ throws ClassNotFoundException, InterruptedException, IOException {
+
+ JobConf oldApiJob = new JobConf(conf);
+ MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_Q,
+ org.apache.hadoop.mapred.SequenceFileOutputFormat.class, labelClass,
+ VectorWritable.class);
+
+ Job job = new Job(oldApiJob);
+ job.setJobName("Bt-job");
+ job.setJarByClass(BtJob.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ FileInputFormat.setInputPaths(job, inputPathA);
+ if (minSplitSize > 0)
+ SequenceFileInputFormat.setMinInputSplitSize(job, minSplitSize);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ // MultipleOutputs.addNamedOutput(job, OUTPUT_Bt,
+ // SequenceFileOutputFormat.class,
+ // QJobKeyWritable.class,QJobValueWritable.class);
+
+ // Warn: tight hadoop integration here:
+ job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BT);
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+ SequenceFileOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(VectorWritable.class);
+
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+
+ job.setMapperClass(BtMapper.class);
+ job.setCombinerClass(OuterProductReducer.class);
+ job.setReducerClass(OuterProductReducer.class);
+ // job.setPartitionerClass(QPartitioner.class);
+
+ // job.getConfiguration().setInt(QJob.PROP_AROWBLOCK_SIZE,aBlockRows );
+ // job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
+ job.getConfiguration().setInt(QJob.PROP_K, k);
+ job.getConfiguration().setInt(QJob.PROP_P, p);
+ job.getConfiguration().set(PROP_QJOB_PATH, inputPathQJob.toString());
+
+ // number of reduce tasks doesn't matter. we don't actually
+ // send anything to reducers. in fact, the only reason
+ // we need to configure reduce step is so that combiners can fire.
+ // so reduce here is purely symbolic.
+ job.setNumReduceTasks(numReduceTasks);
+
+ job.submit();
+ job.waitForCompletion(false);
+
+ if (!job.isSuccessful())
+ throw new IOException("Bt job unsuccessful.");
+
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.MatrixWritable;
+
+/**
+ * Ad-hoc substitution for {@link MatrixWritable}. Perhaps more useful for
+ * situations with mostly dense data (such as Q-blocks) but reduces GC by
+ * reusing the same block memory between loads and writes.
+ * <p>
+ *
+ * in case of Q blocks, it doesn't even matter if they this data is dense cause
+ * we need to unpack it into dense for fast access in computations anyway and
+ * even if it is not so dense the block compressor in sequence files will take
+ * care of it for the serialized size.
+ * <P>
+ *
+ *
+ */
+public class DenseBlockWritable implements Writable {
+ double[][] block;
+
+ public DenseBlockWritable() {
+ super();
+ }
+
+ public void setBlock(double[][] block) {
+ this.block = block;
+ }
+
+ public double[][] getBlock() {
+ return block;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int m = in.readInt();
+ int n = in.readInt();
+ if (block == null)
+ block = new double[m][0];
+ else if (block.length != m)
+ block = Arrays.copyOf(block, m);
+ for (int i = 0; i < m; i++) {
+ if (block[i] == null || block[i].length != n)
+ block[i] = new double[n];
+ for (int j = 0; j < n; j++)
+ block[i][j] = in.readDouble();
+
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ int m = block.length;
+ int n = block.length == 0 ? 0 : block[0].length;
+
+ out.writeInt(m);
+ out.writeInt(n);
+ for (int i = 0; i < m; i++)
+ for (int j = 0; j < n; j++)
+ out.writeDouble(block[i][j]);
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.math.AbstractVector;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+
+/**
+ * Givens Thin solver. Standard Givens operations are reordered in a way that
+ * helps us to push them thru MapReduce operations in a block fashion.
+ *
+ *
+ */
+public class GivensThinSolver {
+
+ // private static final double s_epsilon = 1e-10;
+
+ // private double[][] m_rTilde;
+ // private Vector m_aRowV;
+ private double[] vARow;
+ private double[] vQtRow;
+ // private UpperTriangular m_rTilde;
+ // private TriangularRowView m_rTildeRowView, m_rTildeRowView2;
+ private double[][] mQt;
+ private double[][] mR;
+ private int qtStartRow;
+ private int rStartRow;
+ private int m;
+ private int n; // m-row cnt, n- column count, m>=n
+ private int cnt;
+ private double[] cs = new double[2];
+
+ public GivensThinSolver(int m, int n) {
+ super();
+
+ if (!(m >= n))
+ throw new IllegalArgumentException("Givens thin QR: must be true: m>=n");
+
+ this.m = m;
+ this.n = n;
+
+ mQt = new double[n][];
+ mR = new double[n][];
+ vARow = new double[n];
+ // m_aRowV = new DenseVector(m_aRow, true);
+ vQtRow = new double[m];
+
+ for (int i = 0; i < n; i++) {
+ mQt[i] = new double[this.m];
+ mR[i] = new double[this.n];
+ }
+ cnt = 0;
+ }
+
+ public void reset() {
+ cnt = 0;
+ }
+
+ public void solve(Matrix a) {
+
+ assert a.rowSize() == m;
+ assert a.columnSize() == n;
+
+ double[] aRow = new double[n];
+ for (int i = 0; i < m; i++) {
+ Vector aRowV = a.getRow(i);
+ for (int j = 0; j < n; j++)
+ aRow[j] = aRowV.getQuick(j);
+ appendRow(aRow);
+ }
+ }
+
+ public boolean isFull() {
+ return cnt == m;
+ }
+
+ public int getM() {
+ return m;
+ }
+
+ public int getN() {
+ return n;
+ }
+
+ public int getCnt() {
+ return cnt;
+ }
+
+ public void adjust(int newM) {
+ if (newM == m)
+ return; // no adjustment is required.
+ if (newM < n)
+ throw new IllegalArgumentException("new m can't be less than n");
+ if (newM < cnt)
+ throw new IllegalArgumentException(
+ "new m can't be less than rows accumulated");
+ vQtRow = new double[newM];
+
+ // grow or shrink qt rows
+ if (newM > m) {
+ // grow qt rows
+ for (int i = 0; i < n; i++) {
+ mQt[i] = Arrays.copyOf(mQt[i], newM);
+ System.arraycopy(mQt[i], 0, mQt[i], newM - m, m);
+ Arrays.fill(mQt[i], 0, newM - m, 0);
+ }
+ } else {
+ // shrink qt rows
+ for (int i = 0; i < n; i++) {
+ mQt[i] = Arrays.copyOfRange(mQt[i], m - newM, m);
+ }
+ }
+
+ m = newM;
+
+ }
+
+ public void trim() {
+ adjust(cnt);
+ }
+
+ /**
+ * api for row-by-row addition
+ *
+ * @param aRow
+ */
+ public void appendRow(double[] aRow) {
+ if (cnt >= m)
+ throw new RuntimeException(
+ "thin QR solver fed more rows than initialized for");
+ try {
+ // moving pointers around is inefficient but
+ // for the sanity's sake i am keeping it this way so i don't
+ // have to guess how R-tilde index maps to actual block index
+
+ Arrays.fill(vQtRow, 0);
+ vQtRow[m - cnt - 1] = 1;
+ int height = cnt > n ? n : cnt;
+ System.arraycopy(aRow, 0, vARow, 0, n);
+
+ if (height > 0) {
+ givens(vARow[0], getRRow(0)[0], cs);
+ applyGivensInPlace(cs[0], cs[1], vARow, getRRow(0), 0, n);
+ applyGivensInPlace(cs[0], cs[1], vQtRow, getQtRow(0), 0, m);
+ }
+
+ for (int i = 1; i < height; i++) {
+ givens(getRRow(i - 1)[i], getRRow(i)[i], cs);
+ applyGivensInPlace(cs[0], cs[1], getRRow(i - 1), getRRow(i), i,
+ n - i);
+ applyGivensInPlace(cs[0], cs[1], getQtRow(i - 1), getQtRow(i), 0,
+ m);
+ }
+ // push qt and r-tilde 1 row down
+ // just sqp the references to reduce GC churning
+ pushQtDown();
+ double[] swap = getQtRow(0);
+ setQtRow(0, vQtRow);
+ vQtRow = swap;
+
+ // triangular push -- obviously, less efficient than
+ // this is terribly inefficient. for each row we are basically
+ // moving ~ 2-4Mb of memory around.
+ // for (int i = m_n - 1; i > 0; i--) {
+ // // copy (i-1)th row into i-th row ignoring main diagonal item
+ // // which must be 0 now
+ // assert m_rTilde.getQuick(i - 1, i - 1) <= s_epsilon;
+ // for (int j = i; j < m_n; j++)
+ // m_rTilde.setQuick(i, j, m_rTilde.getQuick(i - 1, j));
+ // }
+ // for (int i = 0; i < m_n; i++)
+ // m_rTilde.setQuick(0, i, m_aRow[i]);
+ pushRDown();
+ swap = getRRow(0);
+ setRRow(0, vARow);
+ vARow = swap;
+
+ } finally {
+ cnt++;
+ }
+ }
+
+ private double[] getQtRow(int row) {
+
+ return mQt[(row += qtStartRow) >= n ? row - n : row];
+ }
+
+ private void setQtRow(int row, double[] qtRow) {
+ mQt[(row += qtStartRow) >= n ? row - n : row] = qtRow;
+ }
+
+ private void pushQtDown() {
+ qtStartRow = qtStartRow == 0 ? n - 1 : qtStartRow - 1;
+ }
+
+ private double[] getRRow(int row) {
+ return mR[(row += rStartRow) >= n ? row - n : row];
+ }
+
+ private void setRRow(int row, double[] rrow) {
+ mR[(row += rStartRow) >= n ? row - n : row] = rrow;
+ }
+
+ private void pushRDown() {
+ rStartRow = rStartRow == 0 ? n - 1 : rStartRow - 1;
+ }
+
+ // warning: both of these return actually n+1 rows with the last one being
+ // not interesting.
+ public UpperTriangular getRTilde() {
+ UpperTriangular packedR = new UpperTriangular(n);
+ for (int i = 0; i < n; i++)
+ packedR.assignRow(i, getRRow(i));
+ return packedR;
+ }
+
+ public double[][] getThinQtTilde() {
+ if (qtStartRow != 0) {
+ // rotate qt rows into place
+ double[][] qt = new double[n][]; // double[~500][], once per block, not
+ // a big deal.
+ System.arraycopy(mQt, qtStartRow, qt, 0, n - qtStartRow);
+ System.arraycopy(mQt, 0, qt, n - qtStartRow, qtStartRow);
+ return qt;
+ }
+ return mQt;
+ }
+
+ public static void applyGivensInPlace(double c, double s, double[] row1,
+ double[] row2, int offset, int len) {
+
+ int n = offset + len;
+ for (int j = offset; j < n; j++) {
+ double tau1 = row1[j];
+ double tau2 = row2[j];
+ row1[j] = c * tau1 - s * tau2;
+ row2[j] = s * tau1 + c * tau2;
+ }
+ }
+
+ public static void applyGivensInPlace(double c, double s, Vector row1,
+ Vector row2, int offset, int len) {
+
+ int n = offset + len;
+ for (int j = offset; j < n; j++) {
+ double tau1 = row1.getQuick(j);
+ double tau2 = row2.getQuick(j);
+ row1.setQuick(j, c * tau1 - s * tau2);
+ row2.setQuick(j, s * tau1 + c * tau2);
+ }
+ }
+
+ public static void applyGivensInPlace(double c, double s, int i, int k,
+ Matrix mx) {
+ int n = mx.columnSize();
+
+ for (int j = 0; j < n; j++) {
+ double tau1 = mx.get(i, j);
+ double tau2 = mx.get(k, j);
+ mx.set(i, j, c * tau1 - s * tau2);
+ mx.set(k, j, s * tau1 + c * tau2);
+ }
+ }
+
+ public static void fromRho(double rho, double[] csOut) {
+ if (rho == 1) {
+ csOut[0] = 0;
+ csOut[1] = 1;
+ return;
+ }
+ if (Math.abs(rho) < 1) {
+ csOut[1] = 2 * rho;
+ csOut[0] = Math.sqrt(1 - csOut[1] * csOut[1]);
+ return;
+ }
+ csOut[0] = 2 / rho;
+ csOut[1] = Math.sqrt(1 - csOut[0] * csOut[0]);
+ }
+
+ public static void givens(double a, double b, double[] csOut) {
+ if (b == 0) {
+ csOut[0] = 1;
+ csOut[1] = 0;
+ return;
+ }
+ if (Math.abs(b) > Math.abs(a)) {
+ double tau = -a / b;
+ csOut[1] = 1 / Math.sqrt(1 + tau * tau);
+ csOut[0] = csOut[1] * tau;
+ } else {
+ double tau = -b / a;
+ csOut[0] = 1 / Math.sqrt(1 + tau * tau);
+ csOut[1] = csOut[0] * tau;
+ }
+ }
+
+ public static double toRho(double c, double s) {
+ if (c == 0)
+ return 1;
+ if (Math.abs(s) < Math.abs(c))
+ return Math.signum(c) * s / 2;
+ else
+ return Math.signum(s) * 2 / c;
+ }
+
+ public static void mergeR(UpperTriangular r1, UpperTriangular r2) {
+ TriangularRowView r1Row = new TriangularRowView(r1);
+ TriangularRowView r2Row = new TriangularRowView(r2);
+
+ int kp = r1Row.size();
+ assert kp == r2Row.size();
+
+ double[] cs = new double[2];
+
+ for (int v = 0; v < kp; v++) {
+ for (int u = v; u < kp; u++) {
+ givens(r1Row.setViewedRow(u).get(u), r2Row.setViewedRow(u - v).get(u),
+ cs);
+ applyGivensInPlace(cs[0], cs[1], r1Row, r2Row, u, kp - u);
+ }
+ }
+ }
+
+ public static void mergeR(double[][] r1, double[][] r2) {
+ int kp = r1[0].length;
+ assert kp == r2[0].length;
+
+ double[] cs = new double[2];
+
+ for (int v = 0; v < kp; v++) {
+ for (int u = v; u < kp; u++) {
+ givens(r1[u][u], r2[u - v][u], cs);
+ applyGivensInPlace(cs[0], cs[1], r1[u], r2[u - v], u, kp - u);
+ }
+ }
+
+ }
+
+ public static void mergeRonQ(UpperTriangular r1, UpperTriangular r2,
+ double[][] qt1, double[][] qt2) {
+ TriangularRowView r1Row = new TriangularRowView(r1);
+ TriangularRowView r2Row = new TriangularRowView(r2);
+ int kp = r1Row.size();
+ assert kp == r2Row.size();
+ assert kp == qt1.length;
+ assert kp == qt2.length;
+
+ int r = qt1[0].length;
+ assert qt2[0].length == r;
+
+ double[] cs = new double[2];
+
+ for (int v = 0; v < kp; v++) {
+ for (int u = v; u < kp; u++) {
+ givens(r1Row.setViewedRow(u).get(u), r2Row.setViewedRow(u - v).get(u),
+ cs);
+ applyGivensInPlace(cs[0], cs[1], r1Row, r2Row, u, kp - u);
+ applyGivensInPlace(cs[0], cs[1], qt1[u], qt2[u - v], 0, r);
+ }
+ }
+ }
+
+ public static void mergeRonQ(double[][] r1, double[][] r2, double[][] qt1,
+ double[][] qt2) {
+
+ int kp = r1[0].length;
+ assert kp == r2[0].length;
+ assert kp == qt1.length;
+ assert kp == qt2.length;
+
+ int r = qt1[0].length;
+ assert qt2[0].length == r;
+ double[] cs = new double[2];
+
+ // pairwise givens(a,b) so that a come off main diagonal in r1
+ // and bs come off u-th upper subdiagonal in r2.
+ for (int v = 0; v < kp; v++) {
+ for (int u = v; u < kp; u++) {
+ givens(r1[u][u], r2[u - v][u], cs);
+ applyGivensInPlace(cs[0], cs[1], r1[u], r2[u - v], u, kp - u);
+ applyGivensInPlace(cs[0], cs[1], qt1[u], qt2[u - v], 0, r);
+ }
+ }
+ }
+
+ // returns merged Q (which in this case is the qt1)
+ public static double[][] mergeQrUp(double[][] qt1, double[][] r1,
+ double[][] r2) {
+ int kp = qt1.length;
+ int r = qt1[0].length;
+
+ double[][] qTilde = new double[kp][];
+ for (int i = 0; i < kp; i++)
+ qTilde[i] = new double[r];
+ mergeRonQ(r1, r2, qt1, qTilde);
+ return qt1;
+ }
+
+ // returns merged Q (which in this case is the qt1)
+ public static double[][] mergeQrUp(double[][] qt1, UpperTriangular r1,
+ UpperTriangular r2) {
+ int kp = qt1.length;
+ int r = qt1[0].length;
+
+ double[][] qTilde = new double[kp][];
+ for (int i = 0; i < kp; i++)
+ qTilde[i] = new double[r];
+ mergeRonQ(r1, r2, qt1, qTilde);
+ return qt1;
+ }
+
+ public static double[][] mergeQrDown(double[][] r1, double[][] qt2,
+ double[][] r2) {
+ int kp = qt2.length;
+ int r = qt2[0].length;
+
+ double[][] qTilde = new double[kp][];
+ for (int i = 0; i < kp; i++)
+ qTilde[i] = new double[r];
+ mergeRonQ(r1, r2, qTilde, qt2);
+ return qTilde;
+
+ }
+
+ public static double[][] mergeQrDown(UpperTriangular r1, double[][] qt2,
+ UpperTriangular r2) {
+ int kp = qt2.length;
+ int r = qt2[0].length;
+
+ double[][] qTilde = new double[kp][];
+ for (int i = 0; i < kp; i++)
+ qTilde[i] = new double[r];
+ mergeRonQ(r1, r2, qTilde, qt2);
+ return qTilde;
+
+ }
+
+ public static double[][] computeQtHat(double[][] qt, int i,
+ Iterator<UpperTriangular> rIter) {
+ UpperTriangular rTilde = rIter.next();
+ for (int j = 1; j < i; j++)
+ mergeR(rTilde, rIter.next());
+ if (i > 0)
+ qt = mergeQrDown(rTilde, qt, rIter.next());
+ for (int j = i + 1; rIter.hasNext(); j++)
+ qt = mergeQrUp(qt, rTilde, rIter.next());
+ return qt;
+ }
+
+ // test helpers
+ public static boolean isOrthonormal(double[][] qt, boolean insufficientRank,
+ double epsilon) {
+ int n = qt.length;
+ int rank = 0;
+ for (int i = 0; i < n; i++) {
+ Vector ei = new DenseVector(qt[i], true);
+
+ double norm = ei.norm(2);
+
+ if (Math.abs(1 - norm) < epsilon)
+ rank++;
+ else if (Math.abs(norm) > epsilon)
+ return false; // not a rank deficiency, either
+
+ for (int j = 0; j <= i; j++) {
+ Vector ej = new DenseVector(qt[j], true);
+ double dot = ei.dot(ej);
+ if (!(Math.abs((i == j && rank > j ? 1 : 0) - dot) < epsilon))
+ return false;
+ }
+ }
+ return (!insufficientRank && rank == n) || (insufficientRank && rank < n);
+
+ }
+
+ public static boolean isOrthonormalBlocked(Iterable<double[][]> qtHats,
+ boolean insufficientRank, double epsilon) {
+ int n = qtHats.iterator().next().length;
+ int rank = 0;
+ for (int i = 0; i < n; i++) {
+ List<Vector> ei = new ArrayList<Vector>();
+ // Vector e_i=new DenseVector (qt[i],true);
+ for (double[][] qtHat : qtHats)
+ ei.add(new DenseVector(qtHat[i], true));
+
+ double norm = 0;
+ for (Vector v : ei)
+ norm += v.dot(v);
+ norm = Math.sqrt(norm);
+ if (Math.abs(1 - norm) < epsilon)
+ rank++;
+ else if (Math.abs(norm) > epsilon)
+ return false; // not a rank deficiency, either
+
+ for (int j = 0; j <= i; j++) {
+ List<Vector> ej = new ArrayList<Vector>();
+ for (double[][] qtHat : qtHats)
+ ej.add(new DenseVector(qtHat[j], true));
+
+ // Vector e_j = new DenseVector ( qt[j], true);
+ double dot = 0;
+ for (int k = 0; k < ei.size(); k++)
+ dot += ei.get(k).dot(ej.get(k));
+ if (!(Math.abs((i == j && rank > j ? 1 : 0) - dot) < epsilon))
+ return false;
+ }
+ }
+ return (!insufficientRank && rank == n) || (insufficientRank && rank < n);
+
+ }
+
+ private static class TriangularRowView extends AbstractVector {
+ private UpperTriangular viewed;
+ private int rowNum;
+
+ public TriangularRowView(UpperTriangular viewed) {
+ super(viewed.columnSize());
+ this.viewed = viewed;
+
+ }
+
+ TriangularRowView setViewedRow(int row) {
+ rowNum = row;
+ return this;
+ }
+
+ @Override
+ public boolean isDense() {
+ return true;
+ }
+
+ @Override
+ public boolean isSequentialAccess() {
+ return false;
+ }
+
+ @Override
+ public Iterator<Element> iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<Element> iterateNonZero() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double getQuick(int index) {
+ return viewed.getQuick(rowNum, index);
+ }
+
+ @Override
+ public Vector like() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setQuick(int index, double value) {
+ viewed.setQuick(rowNum, index, value);
+
+ }
+
+ @Override
+ public int getNumNondefaultElements() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected Matrix matrixLike(int rows, int columns) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public static class DeepCopyUTIterator implements Iterator<UpperTriangular> {
+
+ private Iterator<UpperTriangular> delegate;
+
+ public DeepCopyUTIterator(Iterator<UpperTriangular> del) {
+ super();
+ delegate = del;
+ }
+
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ public UpperTriangular next() {
+
+ return new UpperTriangular(delegate.next());
+ }
+
+ public void remove() {
+ delegate.remove();
+ }
+
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * simplistic implementation for Omega matrix in Stochastic SVD method
+ *
+ */
+public class Omega {
+
+ private long seed;
+ private Random rnd = new Random();
+ private int kp;
+
+ public Omega(long seed, int k, int p) {
+ super();
+ this.seed = seed;
+ kp = k + p;
+
+ }
+
+ public void accumDots(int aIndex, double aElement, double[] yRow) {
+ rnd.setSeed(getOmegaRowSeed(aIndex, seed, rnd));
+ for (int i = 0; i < kp; i++)
+ yRow[i] += rnd.nextGaussian() * aElement;
+ }
+
+ /**
+ * compute YRow=ARow*Omega.
+ *
+ * @param aRow
+ * row of matrix A (size n)
+ * @param yRow
+ * row of matrix Y (result) must be pre-allocated to size of (k+p)
+ */
+ public void computeYRow(Vector aRow, double[] yRow) {
+ assert yRow.length == kp;
+
+ Arrays.fill(yRow, 0);
+ if (aRow instanceof SequentialAccessSparseVector) {
+ int j = 0;
+ for (Element el : aRow) {
+ accumDots(j, el.get(), yRow);
+ j++;
+ }
+
+ } else {
+ int n = aRow.size();
+ for (int j = 0; j < n; j++)
+ accumDots(j, aRow.getQuick(j), yRow);
+ }
+
+ }
+
+ public long getOmegaRowSeed(int omegaRow, long omegaSeed, Random rnd) {
+ rnd.setSeed(omegaSeed);
+ long rowSeed = rnd.nextLong();
+ rnd.setSeed(rowSeed ^ omegaRow);
+ return rowSeed ^ rnd.nextLong();
+
+ }
+
+ public static long murmur64(byte[] val, int offset, int len, long seed) {
+
+ long m = 0xc6a4a7935bd1e995L;
+ int r = 47;
+ long h = seed ^ (len * m);
+
+ int lt = len >>> 3;
+ for (int i = 0; i < lt; i++, offset += 8) {
+ long k = 0;
+ for (int j = 0; j < 8; j++) {
+ k <<= 8;
+ k |= val[offset + j] & 0xff;
+ }
+
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+
+ h ^= k;
+ h *= m;
+ }
+ long k = 0;
+
+ if (offset < len) {
+ for (; offset < len; offset++) {
+ k <<= 8;
+ k |= val[offset] & 0xff;
+ }
+ h ^= k;
+ h *= m;
+ }
+
+ h ^= h >>> r;
+ h *= m;
+ h ^= h >>> r;
+ return h;
+
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+
+import org.apache.mahout.math.Vector;
+
+/**
+ * This is part of SSVD prototype code
+ *
+ */
+public interface PartialRowEmitter {
+
+ void emitRow(int rowNum, Vector row) throws IOException;
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Compute first level of QHat-transpose blocks.
+ *
+ * See Mahout-376 woking notes for details.
+ *
+ *
+ */
+@SuppressWarnings("deprecation")
+public class QJob {
+
+ public static final String PROP_OMEGA_SEED = "ssvd.omegaseed";
+ public static final String PROP_K = "ssvd.k";
+ public static final String PROP_P = "ssvd.p";
+ public static final String PROP_AROWBLOCK_SIZE = "ssvd.arowblock.size";
+
+ public static final String OUTPUT_R = "R";
+ public static final String OUTPUT_QHAT = "QHat";
+ // public static final String OUTPUT_Q="Q";
+ public static final String OUTPUT_BT = "Bt";
+
+ public static class QJobKeyWritable implements
+ WritableComparable<QJobKeyWritable> {
+
+ private int taskId;
+ private int taskRowOrdinal;
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskId = in.readInt();
+ taskRowOrdinal = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(taskId);
+ out.writeInt(taskRowOrdinal);
+ }
+
+ @Override
+ public int compareTo(QJobKeyWritable o) {
+ if (taskId < o.taskId)
+ return -1;
+ else if (taskId > o.taskId)
+ return 1;
+ if (taskRowOrdinal < o.taskRowOrdinal)
+ return -1;
+ else if (taskRowOrdinal > o.taskRowOrdinal)
+ return 1;
+ return 0;
+ }
+
+ }
+
+ public static class QMapper extends
+ Mapper<Writable, VectorWritable, QJobKeyWritable, VectorWritable> {
+
+ private int kp;
+ private Omega omega;
+ private List<double[]> yLookahead;
+ private GivensThinSolver qSolver;
+ private int blockCnt;
+ // private int m_reducerCount;
+ private int r;
+ private DenseBlockWritable value = new DenseBlockWritable();
+ private QJobKeyWritable key = new QJobKeyWritable();
+ private IntWritable tempKey = new IntWritable();
+ private MultipleOutputs outputs;
+ private LinkedList<Closeable> closeables = new LinkedList<Closeable>();
+ private SequenceFile.Writer tempQw;
+ private Path tempQPath;
+ private List<UpperTriangular> rSubseq = new ArrayList<UpperTriangular>();
+
+ private void flushSolver(Context context) throws IOException,
+ InterruptedException {
+ UpperTriangular r = qSolver.getRTilde();
+ double[][] qt = qSolver.getThinQtTilde();
+
+ rSubseq.add(r);
+
+ value.setBlock(qt);
+ getTempQw(context).append(tempKey, value); // this probably should be
+ // a sparse row matrix,
+ // but compressor should get it for disk and in memory we want it
+ // dense anyway, sparse random implementations would be
+ // a mostly a memory management disaster consisting of rehashes and GC
+ // thrashing. (IMHO)
+ value.setBlock(null);
+ qSolver.reset();
+ }
+
+ // second pass to run a modified version of computeQHatSequence.
+ @SuppressWarnings("unchecked")
+ private void flushQBlocks(Context ctx) throws IOException,
+ InterruptedException {
+ if (blockCnt == 1) {
+ // only one block, no temp file, no second pass. should be the default
+ // mode
+ // for efficiency in most cases. Sure mapper should be able to load
+ // the entire split in memory -- and we don't require even that.
+ value.setBlock(qSolver.getThinQtTilde());
+ outputs.getCollector(OUTPUT_QHAT, null).collect(key, value);
+ outputs.getCollector(OUTPUT_R, null).collect(
+ key,
+ new VectorWritable(new DenseVector(qSolver.getRTilde().getData(),
+ true)));
+
+ } else
+ secondPass(ctx);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void secondPass(Context ctx) throws IOException,
+ InterruptedException {
+ qSolver = null; // release mem
+ FileSystem localFs = FileSystem.getLocal(ctx.getConfiguration());
+ SequenceFile.Reader tempQr = new SequenceFile.Reader(localFs,
+ tempQPath, ctx.getConfiguration());
+ closeables.addFirst(tempQr);
+ int qCnt = 0;
+ while (tempQr.next(tempKey, value)) {
+ value
+ .setBlock(GivensThinSolver.computeQtHat(value.getBlock(), qCnt,
+ new GivensThinSolver.DeepCopyUTIterator(rSubseq.iterator())));
+ if (qCnt == 1) // just merge r[0] <- r[1] so it doesn't have to repeat
+ // in subsequent computeQHat iterators
+ GivensThinSolver.mergeR(rSubseq.get(0), rSubseq.remove(1));
+
+ else
+ qCnt++;
+ outputs.getCollector(OUTPUT_QHAT, null).collect(key, value);
+ }
+
+ assert rSubseq.size() == 1;
+
+ // m_value.setR(m_rSubseq.get(0));
+ outputs.getCollector(OUTPUT_R, null)
+ .collect(
+ key,
+ new VectorWritable(new DenseVector(rSubseq.get(0).getData(),
+ true)));
+
+ }
+
+ @Override
+ protected void map(Writable key, VectorWritable value, Context context)
+ throws IOException, InterruptedException {
+ double[] yRow = null;
+ if (yLookahead.size() == kp) {
+ if (qSolver.isFull()) {
+
+ flushSolver(context);
+ blockCnt++;
+
+ }
+ yRow = yLookahead.remove(0);
+
+ qSolver.appendRow(yRow);
+ } else
+ yRow = new double[kp];
+ omega.computeYRow(value.get(), yRow);
+ yLookahead.add(yRow);
+ }
+
+ @Override
+ protected void setup(final Context context) throws IOException,
+ InterruptedException {
+
+ int k = Integer.parseInt(context.getConfiguration().get(PROP_K));
+ int p = Integer.parseInt(context.getConfiguration().get(PROP_P));
+ kp = k + p;
+ long omegaSeed = Long.parseLong(context.getConfiguration().get(
+ PROP_OMEGA_SEED));
+ r = Integer.parseInt(context.getConfiguration()
+ .get(PROP_AROWBLOCK_SIZE));
+ omega = new Omega(omegaSeed, k, p);
+ yLookahead = new ArrayList<double[]>(kp);
+ qSolver = new GivensThinSolver(r, kp);
+ outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
+ closeables.addFirst(new Closeable() {
+ @Override
+ public void close() throws IOException {
+ outputs.close();
+ }
+ });
+
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ try {
+ if (qSolver == null && yLookahead.size() == 0)
+ return;
+ if (qSolver == null)
+ qSolver = new GivensThinSolver(yLookahead.size(), kp);
+ // grow q solver up if necessary
+
+ qSolver.adjust(qSolver.getCnt() + yLookahead.size());
+ while (yLookahead.size() > 0) {
+
+ qSolver.appendRow(yLookahead.remove(0));
+
+ }
+ assert qSolver.isFull();
+ if (++blockCnt > 1) {
+ flushSolver(context);
+ assert tempQw != null;
+ closeables.remove(tempQw);
+ tempQw.close();
+ }
+ flushQBlocks(context);
+
+ } finally {
+ IOUtils.close(closeables);
+ }
+
+ }
+
+ private SequenceFile.Writer getTempQw(Context context) throws IOException {
+ if (tempQw == null) {
+ // temporary Q output
+ // hopefully will not exceed size of IO cache in which case it is only
+ // good since it
+ // is going to be maanged by kernel, not java GC. And if IO cache is not
+ // good enough,
+ // then at least it is always sequential.
+ String taskTmpDir = System.getProperty("java.io.tmpdir");
+ FileSystem localFs = FileSystem.getLocal(context.getConfiguration());
+ tempQPath = new Path(new Path(taskTmpDir), "q-temp.seq");
+ tempQw = SequenceFile.createWriter(localFs,
+ context.getConfiguration(), tempQPath, IntWritable.class,
+ DenseBlockWritable.class, CompressionType.BLOCK);
+ closeables.addFirst(tempQw);
+ closeables.addFirst(new IOUtils.DeleteFileOnClose(new File(tempQw
+ .toString())));
+
+ }
+ return tempQw;
+ }
+ }
+
+ public static void run(Configuration conf, Path[] inputPaths,
+ Path outputPath, int aBlockRows, int minSplitSize, int k, int p,
+ long seed, int numReduceTasks) throws ClassNotFoundException,
+ InterruptedException, IOException {
+
+ JobConf oldApiJob = new JobConf(conf);
+ MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_QHAT,
+ org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ QJobKeyWritable.class, DenseBlockWritable.class);
+ MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_R,
+ org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ QJobKeyWritable.class, VectorWritable.class);
+
+ Job job = new Job(oldApiJob);
+ job.setJobName("Q-job");
+ job.setJarByClass(QJob.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, inputPaths);
+ if (minSplitSize > 0)
+ SequenceFileInputFormat.setMinInputSplitSize(job, minSplitSize);
+
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+ SequenceFileOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+
+ job.setMapOutputKeyClass(QJobKeyWritable.class);
+ job.setMapOutputValueClass(VectorWritable.class);
+
+ job.setOutputKeyClass(QJobKeyWritable.class);
+ job.setOutputValueClass(VectorWritable.class);
+
+ job.setMapperClass(QMapper.class);
+
+ job.getConfiguration().setInt(PROP_AROWBLOCK_SIZE, aBlockRows);
+ job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
+ job.getConfiguration().setInt(PROP_K, k);
+ job.getConfiguration().setInt(PROP_P, p);
+
+ // number of reduce tasks doesn't matter. we don't actually
+ // send anything to reducers. in fact, the only reason
+ // we need to configure reduce step is so that combiners can fire.
+ // so reduce here is purely symbolic.
+ job.setNumReduceTasks(0 /* numReduceTasks */);
+
+ job.submit();
+ job.waitForCompletion(false);
+
+ if (!job.isSuccessful())
+ throw new IOException("Q job unsuccessful.");
+
+ }
+
+ public static enum QJobCntEnum {
+ NUM_Q_BLOCKS;
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,126 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Mahout CLI adapter for SSVDSolver
+ *
+ *
+ */
+public class SSVDCli extends AbstractJob {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+ addOption("rank", "k", "decomposition rank", true);
+ addOption("oversampling", "p", "oversampling", true);
+ addOption("blockHeight", "r", "Y block height (must be > (k+p))", true);
+ addOption("minSplitSize", "s", "minimum split size", "-1");
+ addOption("computeU", "U", "compute U (true/false)", "true");
+ addOption("uHalfSigma", "uhs", "Compute U as UHat=U x pow(Sigma,0.5)",
+ "false");
+ addOption("computeV", "V", "compute V (true/false)", "true");
+ addOption("vHalfSigma", "vhs", "compute V as VHat= V x pow(Sigma,0.5)",
+ "false");
+ addOption("reduceTasks", "t", "number of reduce tasks (where applicable)",
+ "1");
+
+ Map<String, String> pargs = parseArguments(args);
+ if (pargs == null)
+ return -1;
+
+ String input = pargs.get("--input");
+ String output = pargs.get("--output");
+ String tempDir = pargs.get("--tempDir");
+ int k = Integer.parseInt(pargs.get("--rank"));
+ int p = Integer.parseInt(pargs.get("--oversampling"));
+ int r = Integer.parseInt(pargs.get("--blockHeight"));
+ int minSplitSize = Integer.parseInt(pargs.get("--minSplitSize"));
+ boolean computeU = Boolean.parseBoolean(pargs.get("--computeU"));
+ boolean computeV = Boolean.parseBoolean(pargs.get("--computeV"));
+ boolean cUHalfSigma = Boolean.parseBoolean(pargs.get("--uHalfSigma"));
+ boolean cVHalfSigma = Boolean.parseBoolean(pargs.get("--vHalfSigma"));
+ int reduceTasks = Integer.parseInt(pargs.get("--reduceTasks"));
+
+ Configuration conf = getConf();
+ if (conf == null)
+ throw new IOException("No Hadoop configuration present");
+
+ SSVDSolver solver = new SSVDSolver(conf, new Path[] { new Path(input) },
+ new Path(tempDir), r, k, p, reduceTasks);
+ solver.setMinSplitSize(minSplitSize);
+ solver.setComputeU(computeU);
+ solver.setComputeV(computeV);
+ solver.setcUHalfSigma(cUHalfSigma);
+ solver.setcVHalfSigma(cVHalfSigma);
+
+ solver.run();
+
+ // housekeeping
+ FileSystem fs = FileSystem.get(conf);
+
+ Path outPath = new Path(output);
+ fs.mkdirs(outPath);
+
+ SequenceFile.Writer sigmaW = SequenceFile.createWriter(fs, conf, new Path(
+ outPath, "sigma"), NullWritable.class, VectorWritable.class);
+ try {
+ VectorWritable sValues = new VectorWritable(new DenseVector(
+ Arrays.copyOf(solver.getSingularValues(), k), true));
+ sigmaW.append(NullWritable.get(), sValues);
+
+ } finally {
+ sigmaW.close();
+ }
+
+ if (computeU) {
+ FileStatus[] uFiles = fs.globStatus(new Path(solver.getUPath()));
+ if (uFiles != null)
+ for (FileStatus uf : uFiles)
+ fs.rename(uf.getPath(), outPath);
+ }
+ if (computeV) {
+ FileStatus[] vFiles = fs.globStatus(new Path(solver.getVPath()));
+ if (vFiles != null)
+ for (FileStatus vf : vFiles)
+ fs.rename(vf.getPath(), outPath);
+
+ }
+ return 0;
+ }
+
+ public static int main(String[] args) throws Exception {
+ return ToolRunner.run(new SSVDCli(), args);
+ }
+
+}
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java Mon Mar 7 06:34:12 2011
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.SingularValueDecomposition;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.ssvd.EigenSolverWrapper;
+
+/**
+ * SSVD protoptype: non-MR concept verification for Givens QR & SSVD basic
+ * algorithms
+ *
+ * @author Dmitriy
+ *
+ */
+public class SSVDPrototype {
+
+
+ private Omega omega;
+ private int kp; // k+p
+ private GivensThinSolver qSolver;
+ private double[] yRow;
+ private int cnt;
+ private int blckCnt;
+ private int r;
+ private List<UpperTriangular> rBlocks = new ArrayList<UpperTriangular>();
+ private List<double[][]> qtBlocks = new ArrayList<double[][]>();
+ private List<double[]> yLookahead;
+
+ public SSVDPrototype(long seed, int kp, int r) {
+ super();
+ this.kp = kp;
+ omega = new Omega(seed, kp / 2, kp - (kp / 2));
+ yRow = new double[kp];
+ // m_yRowV = new DenseVector(m_yRow,true);
+ this.r = r;
+ yLookahead = new ArrayList<double[]>(kp);
+ }
+
+ void firstPass(int aRowId, Vector aRow) throws IOException {
+
+ omega.computeYRow(aRow, yRow);
+
+ yLookahead.add(yRow.clone()); // bad for GC but it's just a prototype,
+ // hey. in real thing we'll rotate usage
+ // of y buff
+
+ while (yLookahead.size() > kp) {
+
+ if (qSolver == null)
+ qSolver = new GivensThinSolver(r, kp);
+
+ qSolver.appendRow(yLookahead.remove(0));
+ if (qSolver.isFull()) {
+ UpperTriangular r = qSolver.getRTilde();
+ double[][] qt = qSolver.getThinQtTilde();
+ qSolver = null;
+ qtBlocks.add(qt);
+ rBlocks.add(r);
+ }
+
+ }
+ cnt++;
+ }
+
+ void finishFirstPass() {
+
+ if (qSolver == null && yLookahead.size() == 0)
+ return;
+ if (qSolver == null)
+ qSolver = new GivensThinSolver(yLookahead.size(), kp);
+ // grow q solver up if necessary
+
+ qSolver.adjust(qSolver.getCnt() + yLookahead.size());
+ while (yLookahead.size() > 0) {
+
+ qSolver.appendRow(yLookahead.remove(0));
+ if (qSolver.isFull()) {
+ UpperTriangular r = qSolver.getRTilde();
+ double[][] qt = qSolver.getThinQtTilde();
+ qSolver = null;
+ qtBlocks.add(qt);
+ rBlocks.add(r);
+ }
+
+ }
+
+ // simulate reducers -- produce qHats
+ for (int i = 0; i < rBlocks.size(); i++)
+ qtBlocks.set(i, GivensThinSolver.computeQtHat(qtBlocks.get(i), i,
+ new DeepCopyIterator(rBlocks.listIterator())));
+ cnt = 0;
+ blckCnt = 0;
+ }
+
+ void secondPass(int aRowId, Vector aRow, PartialRowEmitter btEmitter)
+ throws IOException {
+ int n = aRow.size();
+ double[][] qtHat = qtBlocks.get(blckCnt);
+
+ int r = qtHat[0].length;
+ int qRowBlckIndex = r - cnt - 1; // <-- reverse order since we fed A in
+ // reverse
+ double[] qRow = new double[kp];
+ for (int i = 0; i < kp; i++)
+ qRow[i] = qtHat[i][qRowBlckIndex];
+ Vector qRowV = new DenseVector(qRow, true);
+
+ if (++cnt == r) {
+ blckCnt++;
+ cnt = 0;
+ }
+
+ for (int i = 0; i < n; i++)
+ btEmitter.emitRow(i, qRowV.times(aRow.getQuick(i)));
+
+ }
+
+ private static class DeepCopyIterator implements Iterator<UpperTriangular> {
+
+ private Iterator<UpperTriangular> delegate;
+
+ public DeepCopyIterator(Iterator<UpperTriangular> del) {
+ super();
+ delegate = del;
+ }
+
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ public UpperTriangular next() {
+
+ return new UpperTriangular(delegate.next());
+ }
+
+ public void remove() {
+ delegate.remove();
+ }
+
+ }
+
+ public static void testThinQr(int dims, int kp, final long rndSeed)
+ throws Exception {
+
+ DenseMatrix mx = new DenseMatrix(dims << 2, dims);
+ // mx.assign(new UnaryFunction() {
+ //
+ // Random m_rnd = new Random(rndSeed);
+ //
+ // @Override
+ // public double apply(double arg0) {
+ // return m_rnd.nextDouble()*1000;
+ // }
+ // });
+
+ Random rnd = new Random();
+ for (int i = 0; i < mx.rowSize(); i++)
+ for (int j = 0; j < mx.columnSize(); j++)
+ mx.set(i, j, rnd.nextDouble() * 1000);
+
+ mx.setQuick(0, 0, 1);
+ mx.setQuick(0, 1, 2);
+ mx.setQuick(0, 2, 3);
+ mx.setQuick(1, 0, 4);
+ mx.setQuick(1, 1, 5);
+ mx.setQuick(1, 2, 6);
+ mx.setQuick(2, 0, 7);
+ mx.setQuick(2, 1, 8);
+ mx.setQuick(2, 2, 9);
+
+ SingularValueDecomposition svd2 = new SingularValueDecomposition(mx);
+ double[] svaluesControl = svd2.getSingularValues();
+
+ for (int i = 0; i < kp; i++)
+ System.out.printf("%.3e ", svaluesControl[i]);
+ System.out.println();
+
+ int m = mx.rowSize(); /* ,n=mx.columnSize(); */
+
+ long seed = new Random().nextLong();
+
+ final Map<Integer, Vector> btRows = new HashMap<Integer, Vector>();
+
+ PartialRowEmitter btEmitter = new PartialRowEmitter() {
+ @Override
+ public void emitRow(int rowNum, Vector row) throws IOException {
+ Vector btRow = btRows.get(rowNum);
+ if (btRow != null)
+ row.addTo(btRow);
+ btRows.put(rowNum, btRow == null ? new DenseVector(row) : btRow);
+ }
+ };
+
+ SSVDPrototype mapperSimulation = new SSVDPrototype(seed, kp, 3000);
+ for (int i = 0; i < m; i++)
+ mapperSimulation.firstPass(i, mx.getRow(i));
+
+ mapperSimulation.finishFirstPass();
+
+ for (int i = 0; i < m; i++)
+ mapperSimulation.secondPass(i, mx.getRow(i), btEmitter);
+
+ // LocalSSVDTest.assertOrthonormality(mapperSimulation.m_qt.transpose(),
+ // false,1e-10);
+
+ // reconstruct bbt
+ final Map<Integer, Vector> bbt = new HashMap<Integer, Vector>();
+ PartialRowEmitter bbtEmitter = new PartialRowEmitter() {
+
+ @Override
+ public void emitRow(int rowNum, Vector row) throws IOException {
+ Vector bbtRow = bbt.get(rowNum);
+ if (bbtRow != null)
+ row.addTo(bbtRow);
+ bbt.put(rowNum, bbtRow == null ? new DenseVector(row) : bbtRow);
+ }
+ };
+
+ for (Map.Entry<Integer, Vector> btRowEntry : btRows.entrySet()) {
+ Vector btRow = btRowEntry.getValue();
+ assert btRow.size() == kp;
+ for (int i = 0; i < kp; i++)
+ bbtEmitter.emitRow(i, btRow.times(btRow.getQuick(i)));
+ }
+
+ double[][] bbtValues = new double[kp][];
+ for (int i = 0; i < kp; i++) {
+ bbtValues[i] = new double[kp];
+ Vector bbtRow = bbt.get(i);
+ for (int j = 0; j < kp; j++)
+ bbtValues[i][j] = bbtRow.getQuick(j);
+ }
+
+ EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(bbtValues);
+ double[] eigenva2 = eigenWrapper.getEigenValues();
+ double[] svalues = new double[kp];
+ for (int i = 0; i < kp; i++)
+ svalues[i] = Math.sqrt(eigenva2[i]); // sqrt?
+
+ for (int i = 0; i < kp; i++)
+ System.out.printf("%.3e ", svalues[i]);
+ System.out.println();
+
+ }
+
+ public static void testBlockQrWithSSVD(int dims, int kp, int r,
+ final long rndSeed) throws Exception {
+
+ DenseMatrix mx = new DenseMatrix(dims << 2, dims);
+ // mx.assign(new UnaryFunction() {
+ //
+ // Random m_rnd = new Random(rndSeed);
+ //
+ // @Override
+ // public double apply(double arg0) {
+ // return (m_rnd.nextDouble()-0.5)*1000;
+ // }
+ // });
+
+ Random rnd = new Random();
+ for (int i = 0; i < mx.rowSize(); i++)
+ for (int j = 0; j < mx.columnSize(); j++)
+ mx.set(i, j, (rnd.nextDouble() - 0.5) * 1000);
+ mx.setQuick(0, 0, 1);
+ mx.setQuick(0, 1, 2);
+ mx.setQuick(0, 2, 3);
+ mx.setQuick(1, 0, 4);
+ mx.setQuick(1, 1, 5);
+ mx.setQuick(1, 2, 6);
+ mx.setQuick(2, 0, 7);
+ mx.setQuick(2, 1, 8);
+ mx.setQuick(2, 2, 9);
+
+ SingularValueDecomposition svd2 = new SingularValueDecomposition(mx);
+ double[] svaluesControl = svd2.getSingularValues();
+
+ for (int i = 0; i < kp; i++)
+ System.out.printf("%e ", svaluesControl[i]);
+ System.out.println();
+
+ int m = mx.rowSize(); /* ,n=mx.columnSize(); */
+
+ final Map<Integer, Vector> btRows = new HashMap<Integer, Vector>();
+
+ PartialRowEmitter btEmitter = new PartialRowEmitter() {
+ @Override
+ public void emitRow(int rowNum, Vector row) throws IOException {
+ Vector btRow = btRows.get(rowNum);
+ if (btRow != null)
+ row.addTo(btRow);
+ btRows.put(rowNum, btRow == null ? new DenseVector(row) : btRow);
+ }
+ };
+
+ SSVDPrototype mapperSimulation = new SSVDPrototype(rndSeed, kp, r);
+ for (int i = 0; i < m; i++)
+ mapperSimulation.firstPass(i, mx.getRow(i));
+
+ mapperSimulation.finishFirstPass();
+
+ for (int i = 0; i < m; i++)
+ mapperSimulation.secondPass(i, mx.getRow(i), btEmitter);
+
+ // LocalSSVDTest.assertOrthonormality(mapperSimulation.m_qt.transpose(),
+ // false,1e-10);
+
+ // reconstruct bbt
+ final Map<Integer, Vector> bbt = new HashMap<Integer, Vector>();
+ PartialRowEmitter bbtEmitter = new PartialRowEmitter() {
+
+ @Override
+ public void emitRow(int rowNum, Vector row) throws IOException {
+ Vector bbtRow = bbt.get(rowNum);
+ if (bbtRow != null)
+ row.addTo(bbtRow);
+ bbt.put(rowNum, bbtRow == null ? new DenseVector(row) : bbtRow);
+ }
+ };
+
+ for (Map.Entry<Integer, Vector> btRowEntry : btRows.entrySet()) {
+ Vector btRow = btRowEntry.getValue();
+ assert btRow.size() == kp;
+ for (int i = 0; i < kp; i++)
+ bbtEmitter.emitRow(i, btRow.times(btRow.getQuick(i)));
+ }
+
+ double[][] bbtValues = new double[kp][];
+ for (int i = 0; i < kp; i++) {
+ bbtValues[i] = new double[kp];
+ Vector bbtRow = bbt.get(i);
+ for (int j = 0; j < kp; j++)
+ bbtValues[i][j] = bbtRow.getQuick(j);
+ }
+
+ EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(bbtValues);
+
+ double[] eigenva2 = eigenWrapper.getEigenValues();
+ double[] svalues = new double[kp];
+ for (int i = 0; i < kp; i++)
+ svalues[i] = Math.sqrt(eigenva2[i]); // sqrt?
+
+ for (int i = 0; i < kp; i++)
+ System.out.printf("%e ", svalues[i]);
+ System.out.println();
+ }
+
+ public static void main(String[] args) throws Exception {
+ // testThinQr();
+ long seed = new Random().nextLong();
+ testBlockQrWithSSVD(200, 200, 800, seed);
+ testBlockQrWithSSVD(200, 20, 800, seed);
+ testBlockQrWithSSVD(200, 20, 850, seed); // test trimming
+ testBlockQrWithSSVD(200, 20, 90, seed);
+ testBlockQrWithSSVD(200, 20, 99, seed);
+ }
+
+}