You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2011/03/07 07:34:13 UTC

svn commit: r1078694 [1/2] - in /mahout/trunk: core/ core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/ math/ math/src/main/java/o...

Author: dlyubimov
Date: Mon Mar  7 06:34:12 2011
New Revision: 1078694

URL: http://svn.apache.org/viewvc?rev=1078694&view=rev
Log:
MAHOUT-593: initial addition of Stochastic SVD method (related issue is MAHOUT-376)

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UpperTriangular.java
    mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java
    mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/
    mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/EigenSolverWrapper.java
    mahout/trunk/src/conf/ssvd.props
Modified:
    mahout/trunk/core/pom.xml
    mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java
    mahout/trunk/math/pom.xml
    mahout/trunk/src/conf/driver.classes.props

Modified: mahout/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/mahout/trunk/core/pom.xml?rev=1078694&r1=1078693&r2=1078694&view=diff
==============================================================================
--- mahout/trunk/core/pom.xml (original)
+++ mahout/trunk/core/pom.xml Mon Mar  7 06:34:12 2011
@@ -212,9 +212,11 @@
     </dependency>
 
     <dependency>
-      <groupId>commons-math</groupId>
+<!--      <groupId>commons-math</groupId>-->
+      <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>
-      <version>1.2</version>
+      <!--  version>1.2</version -->
+      <version>2.1</version>
     </dependency>
 
     <dependency>

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java?rev=1078694&r1=1078693&r2=1078694&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/IOUtils.java Mon Mar  7 06:34:12 2011
@@ -18,11 +18,13 @@
 package org.apache.mahout.common;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collection;
 
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -101,4 +103,92 @@ public final class IOUtils {
     quietClose(connection);
   }
   
+  /**
+   * make sure to close all sources, log all of the problems occurred, clear
+   * <code>closeables</code> (to prevent repeating close attempts), re-throw the
+   * last one at the end. Helps resource scope management (e.g. compositions of
+   * {@link Closeable}s objects)
+   * <P>
+   * <p/>
+   * Typical pattern:
+   * <p/>
+   * 
+   * <pre>
+   *   LinkedList<Closeable> closeables = new LinkedList<Closeable>();
+   *   try {
+   *      InputStream stream1 = new FileInputStream(...);
+   *      closeables.addFirst(stream1);
+   *      ...
+   *      InputStream streamN = new FileInputStream(...);
+   *      closeables.addFirst(streamN);
+   *      ...
+   *   } finally {
+   *      IOUtils.close(closeables);
+   *   }
+   * </pre>
+   * 
+   * @param closeables
+   *          must be a modifiable collection of {@link Closeable}s
+   * @throws IOException
+   *           the last exception (if any) of all closed resources
+   * 
+   * 
+   */
+  public static void close(Collection<? extends Closeable> closeables)
+    throws IOException {
+    Throwable lastThr = null;
+
+    for (Closeable closeable : closeables) {
+      try {
+        closeable.close();
+      } catch (Throwable thr) {
+        log.error(thr.getMessage(), thr);
+        lastThr = thr;
+      }
+    }
+
+    // make sure we don't double-close
+    // but that has to be modifiable collection
+    closeables.clear();
+
+    if (lastThr != null) {
+      if (lastThr instanceof IOException) {
+        throw (IOException) lastThr;
+      } else if (lastThr instanceof RuntimeException) {
+        throw (RuntimeException) lastThr;
+      } else if (lastThr instanceof Error) {
+        throw (Error) lastThr;
+      } else {
+        // should not happen
+        throw (IOException) new IOException("Unexpected exception during close")
+            .initCause(lastThr);
+      }
+    }
+
+  }
+
+
+  /**
+   * for temporary files, a file may be considered as a {@link Closeable} too,
+   * where file is wiped on close and thus the disk resource is released
+   * ('closed').
+   * 
+   * 
+   */
+  public static class DeleteFileOnClose implements Closeable {
+
+    private File file;
+
+    public DeleteFileOnClose(File file) {
+      this.file = file;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (file.isFile())
+        file.delete();
+    }
+
+  }
+
 }

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Compute B*Bt using simple fact that B*Bt = sum(outer prod ( B_(*i), (B_(*i))
+ * ).
+ * 
+ */
+public class BBtJob {
+
+  public static final String OUTPUT_BBT = "part";
+
+  public static void run(Configuration conf, Path btPath, Path outputPath,
+      int numReduceTasks) throws IOException, ClassNotFoundException,
+      InterruptedException {
+
+    Job job = new Job(conf);
+    job.setJobName("BBt-job");
+    job.setJarByClass(BBtJob.class);
+
+    // input
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(job, btPath);
+
+    // map
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+    job.setMapperClass(BBtMapper.class);
+    job.setReducerClass(BBtReducer.class);
+
+    // combiner and reducer
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    // output
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+        CompressionType.BLOCK);
+    SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BBT);
+
+    // run
+    job.submit();
+    job.waitForCompletion(false);
+    if (!job.isSuccessful())
+      throw new IOException("BBt job failed.");
+    return;
+
+  }
+
+  // actually, B*Bt matrix is small enough so that we don't need to rely on
+  // combiner.
+  public static class BBtMapper extends
+      Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private VectorWritable vw = new VectorWritable();
+    private IntWritable iw = new IntWritable();
+    private UpperTriangular bbtPartial; // are all partial BBt products
+                                          // symmetrical as well? yes.
+
+    @Override
+    protected void map(IntWritable key, VectorWritable value, Context context)
+        throws IOException, InterruptedException {
+      Vector btVec = value.get();
+      int kp = btVec.size();
+      if (bbtPartial == null) {
+        bbtPartial = new UpperTriangular(kp);
+      }
+      for (int i = 0; i < kp; i++) {
+        // this approach should reduce GC churn rate
+        double mul = btVec.getQuick(i);
+        for (int j = i; j < kp; j++)
+          bbtPartial.setQuick(i, j,
+              bbtPartial.getQuick(i, j) + mul * btVec.getQuick(j));
+      }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+      if (bbtPartial != null) {
+        iw.set(context.getTaskAttemptID().getTaskID().getId());
+        vw.set(new DenseVector(bbtPartial.getData(), true));
+        context.write(iw, vw);
+      }
+      super.cleanup(context);
+    }
+  }
+
+  public static class BBtReducer extends
+      Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private double[] accum;
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+      try {
+        if (accum != null)
+          context.write(new IntWritable(), new VectorWritable(new DenseVector(
+              accum, true)));
+      } finally {
+        super.cleanup(context);
+      }
+    }
+
+    @Override
+    protected void reduce(IntWritable iw, Iterable<VectorWritable> ivw,
+        Context ctx) throws IOException, InterruptedException {
+      Iterator<VectorWritable> vwIter = ivw.iterator();
+      Vector bbtPartial = vwIter.next().get();
+      if (accum == null) {
+        accum = new double[bbtPartial.size()];
+      }
+      do {
+        for (int i = 0; i < accum.length; i++)
+          accum[i] += bbtPartial.getQuick(i);
+      } while (vwIter.hasNext() && null != (bbtPartial = vwIter.next().get()));
+    }
+
+  }
+
+  // naive mapper.
+  // public static class BBtMapper extends
+  // Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+  //
+  // private VectorWritable m_vw = new VectorWritable();
+  // private IntWritable m_iw = new IntWritable();
+  // private DenseVector m_v;
+  // double[] m_vRow;
+  //
+  // @Override
+  // protected void map(IntWritable key, VectorWritable value,
+  // Context context) throws IOException, InterruptedException {
+  // Vector btVec = value.get();
+  // int kp = btVec.size();
+  // if (m_v == null) {
+  // m_v = new DenseVector(m_vRow = new double[kp], true);
+  // m_vw.set(m_v);
+  // }
+  // for (int i = 0; i < kp; i++) {
+  // // this approach should reduce GC churn rate
+  // double mul = btVec.getQuick(i);
+  // for (int j = 0; j < kp; j++)
+  // m_vRow[j] = mul * btVec.getQuick(j);
+  // m_iw.set(i);
+  // context.write(m_iw, m_vw);
+  // }
+  // }
+  // }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.stochasticsvd.QJob.QJobKeyWritable;
+
+/**
+ * Bt job. For details, see working notes in MAHOUT-376. 
+ *
+ */
+@SuppressWarnings("deprecation")
+public class BtJob {
+
+  public static final String OUTPUT_Q = "Q";
+  public static final String OUTPUT_BT = "part";
+  public static final String PROP_QJOB_PATH = "ssvd.QJob.path";
+
+  public static class BtMapper extends
+      Mapper<Writable, VectorWritable, IntWritable, VectorWritable> {
+
+    private SequenceFile.Reader qInput;
+    private List<UpperTriangular> mRs = new ArrayList<UpperTriangular>();
+    private int blockNum;
+    private double[][] mQt;
+    private int cnt;
+    private int r;
+    private MultipleOutputs outputs;
+    private IntWritable btKey = new IntWritable();
+    private VectorWritable btValue = new VectorWritable();
+    private int kp;
+    private VectorWritable qRowValue = new VectorWritable();
+    private int qCount; // debug
+
+    void loadNextQt(Context ctx) throws IOException, InterruptedException {
+      QJobKeyWritable key = new QJobKeyWritable();
+      DenseBlockWritable v = new DenseBlockWritable();
+
+      boolean more = qInput.next(key, v);
+      assert more;
+
+      mQt = GivensThinSolver.computeQtHat(v.getBlock(), blockNum == 0 ? 0
+          : 1, new GivensThinSolver.DeepCopyUTIterator(mRs.iterator()));
+      r = mQt[0].length;
+      kp = mQt.length;
+      if (btValue.get() == null)
+        btValue.set(new DenseVector(kp));
+      if (qRowValue.get() == null)
+        qRowValue.set(new DenseVector(kp));
+
+      // also output QHat -- although we don't know the A labels there. Is it
+      // important?
+      // DenseVector qRow = new DenseVector(m_kp);
+      // IntWritable oKey = new IntWritable();
+      // VectorWritable oV = new VectorWritable();
+      //
+      // for ( int i = m_r-1; i>=0; i-- ) {
+      // for ( int j= 0; j < m_kp; j++ )
+      // qRow.setQuick(j, m_qt[j][i]);
+      // oKey.set((m_blockNum<<20)+m_r-i-1);
+      // oV.set(qRow);
+      // // so the block #s range is thus 0..2048, and number of rows per block
+      // is 0..2^20.
+      // // since we are not really sending it out to sort (it is a 'side
+      // file'),
+      // // it doesn't matter if it overflows.
+      // m_outputs.write( OUTPUT_Q, oKey, oV);
+      // }
+      qCount++;
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+
+      if (qInput != null)
+        qInput.close();
+      if (outputs != null)
+        outputs.close();
+      super.cleanup(context);
+    }
+
+    @Override
+    @SuppressWarnings ({"unchecked"})
+    protected void map(Writable key, VectorWritable value, Context context)
+        throws IOException, InterruptedException {
+      if (mQt != null && cnt++ == r)
+        mQt = null;
+      if (mQt == null) {
+        loadNextQt(context);
+        cnt = 1;
+      }
+
+      // output Bt outer products
+      Vector aRow = value.get();
+      int qRowIndex = r - cnt; // because QHats are initially stored in
+                                   // reverse
+      Vector qRow = qRowValue.get();
+      for (int j = 0; j < kp; j++)
+        qRow.setQuick(j, mQt[j][qRowIndex]);
+
+      outputs.getCollector(OUTPUT_Q, null).collect(key, qRowValue); // make
+                                                                        // sure
+                                                                        // Qs
+                                                                        // are
+                                                                        // inheriting
+                                                                        // A row
+                                                                        // labels.
+
+      int n = aRow.size();
+      Vector btRow = btValue.get();
+      for (int i = 0; i < n; i++) {
+        double mul = aRow.getQuick(i);
+        for (int j = 0; j < kp; j++)
+          btRow.setQuick(j, mul * qRow.getQuick(j));
+        btKey.set(i);
+        context.write(btKey, btValue);
+      }
+
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+
+      Path qJobPath = new Path(context.getConfiguration().get(PROP_QJOB_PATH));
+
+      FileSystem fs = FileSystem.get(context.getConfiguration());
+      // actually this is kind of dangerous
+      // becuase this routine thinks we need to create file name for
+      // our current job and this will use -m- so it's just serendipity we are
+      // calling
+      // it from the mapper too as the QJob did.
+      Path qInputPath = new Path(qJobPath, FileOutputFormat.getUniqueFile(
+          context, QJob.OUTPUT_QHAT, ""));
+      qInput = new SequenceFile.Reader(fs, qInputPath,
+          context.getConfiguration());
+
+      blockNum = context.getTaskAttemptID().getTaskID().getId();
+
+      // read all r files _in order of task ids_, i.e. partitions
+      Path rPath = new Path(qJobPath, QJob.OUTPUT_R + "-*");
+      FileStatus[] rFiles = fs.globStatus(rPath);
+
+      if (rFiles == null)
+        throw new IOException("Can't find R inputs ");
+
+      Arrays.sort(rFiles, SSVDSolver.partitionComparator);
+
+      QJobKeyWritable rKey = new QJobKeyWritable();
+      VectorWritable rValue = new VectorWritable();
+
+      int block = 0;
+      for (FileStatus fstat : rFiles) {
+        SequenceFile.Reader rReader = new SequenceFile.Reader(fs,
+            fstat.getPath(), context.getConfiguration());
+        try {
+          rReader.next(rKey, rValue);
+        } finally {
+          rReader.close();
+        }
+        if (block < blockNum && block > 0)
+          GivensThinSolver.mergeR(mRs.get(0),
+              new UpperTriangular(rValue.get()));
+        else
+          mRs.add(new UpperTriangular(rValue.get()));
+        block++;
+      }
+      outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
+    }
+  }
+
+  public static class OuterProductReducer extends
+      Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private VectorWritable oValue = new VectorWritable();
+    private DenseVector accum;
+
+    @Override
+    protected void reduce(IntWritable key, Iterable<VectorWritable> values,
+        Context ctx) throws IOException, InterruptedException {
+      Iterator<VectorWritable> vwIter = values.iterator();
+
+      Vector vec = vwIter.next().get();
+      if (accum == null || accum.size() != vec.size()) {
+        accum = new DenseVector(vec);
+        oValue.set(accum);
+      } else
+        accum.assign(vec);
+
+      while (vwIter.hasNext())
+        accum.addAll(vwIter.next().get());
+      ctx.write(key, oValue);
+    }
+
+  }
+
+  public static void run(Configuration conf, Path inputPathA[],
+      Path inputPathQJob, Path outputPath, int minSplitSize, int k, int p,
+      int numReduceTasks, Class<? extends Writable> labelClass)
+      throws ClassNotFoundException, InterruptedException, IOException {
+
+    JobConf oldApiJob = new JobConf(conf);
+    MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_Q,
+        org.apache.hadoop.mapred.SequenceFileOutputFormat.class, labelClass,
+        VectorWritable.class);
+
+    Job job = new Job(oldApiJob);
+    job.setJobName("Bt-job");
+    job.setJarByClass(BtJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPathA);
+    if (minSplitSize > 0)
+      SequenceFileInputFormat.setMinInputSplitSize(job, minSplitSize);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    // MultipleOutputs.addNamedOutput(job, OUTPUT_Bt,
+    // SequenceFileOutputFormat.class,
+    // QJobKeyWritable.class,QJobValueWritable.class);
+
+    // Warn: tight hadoop integration here:
+    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BT);
+    SequenceFileOutputFormat.setCompressOutput(job, true);
+    SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+        CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(BtMapper.class);
+    job.setCombinerClass(OuterProductReducer.class);
+    job.setReducerClass(OuterProductReducer.class);
+    // job.setPartitionerClass(QPartitioner.class);
+
+    // job.getConfiguration().setInt(QJob.PROP_AROWBLOCK_SIZE,aBlockRows );
+    // job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
+    job.getConfiguration().setInt(QJob.PROP_K, k);
+    job.getConfiguration().setInt(QJob.PROP_P, p);
+    job.getConfiguration().set(PROP_QJOB_PATH, inputPathQJob.toString());
+
+    // number of reduce tasks doesn't matter. we don't actually
+    // send anything to reducers. in fact, the only reason
+    // we need to configure reduce step is so that combiners can fire.
+    // so reduce here is purely symbolic.
+    job.setNumReduceTasks(numReduceTasks);
+
+    job.submit();
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful())
+      throw new IOException("Bt job unsuccessful.");
+
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.MatrixWritable;
+
+/**
+ * Ad-hoc substitution for {@link MatrixWritable}. Perhaps more useful for
+ * situations with mostly dense data (such as Q-blocks) but reduces GC by
+ * reusing the same block memory between loads and writes.
+ * <p>
+ * 
+ * in case of Q blocks, it doesn't even matter if they this data is dense cause
+ * we need to unpack it into dense for fast access in computations anyway and
+ * even if it is not so dense the block compressor in sequence files will take
+ * care of it for the serialized size.
+ * <P>
+ * 
+ * 
+ */
+public class DenseBlockWritable implements Writable {
+  double[][] block;
+
+  public DenseBlockWritable() {
+    super();
+  }
+
+  public void setBlock(double[][] block) {
+    this.block = block;
+  }
+
+  public double[][] getBlock() {
+    return block;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int m = in.readInt();
+    int n = in.readInt();
+    if (block == null)
+      block = new double[m][0];
+    else if (block.length != m)
+      block = Arrays.copyOf(block, m);
+    for (int i = 0; i < m; i++) {
+      if (block[i] == null || block[i].length != n)
+        block[i] = new double[n];
+      for (int j = 0; j < n; j++)
+        block[i][j] = in.readDouble();
+
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    int m = block.length;
+    int n = block.length == 0 ? 0 : block[0].length;
+
+    out.writeInt(m);
+    out.writeInt(n);
+    for (int i = 0; i < m; i++)
+      for (int j = 0; j < n; j++)
+        out.writeDouble(block[i][j]);
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/GivensThinSolver.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.mahout.math.AbstractVector;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+
+/**
+ * Givens Thin solver. Standard Givens operations are reordered in a way that
+ * helps us to push them thru MapReduce operations in a block fashion.
+ * 
+ * 
+ */
+public class GivensThinSolver {
+
+  // private static final double s_epsilon = 1e-10;
+
+  // private double[][] m_rTilde;
+  // private Vector m_aRowV;
+  private double[] vARow;
+  private double[] vQtRow;
+  // private UpperTriangular m_rTilde;
+  // private TriangularRowView m_rTildeRowView, m_rTildeRowView2;
+  private double[][] mQt;
+  private double[][] mR;
+  private int qtStartRow;
+  private int rStartRow;
+  private int m;
+  private int n; // m-row cnt, n- column count, m>=n
+  private int cnt;
+  private double[] cs = new double[2];
+
+  public GivensThinSolver(int m, int n) {
+    super();
+
+    if (!(m >= n))
+      throw new IllegalArgumentException("Givens thin QR: must be true: m>=n");
+
+    this.m = m;
+    this.n = n;
+
+    mQt = new double[n][];
+    mR = new double[n][];
+    vARow = new double[n];
+    // m_aRowV = new DenseVector(m_aRow, true);
+    vQtRow = new double[m];
+
+    for (int i = 0; i < n; i++) {
+      mQt[i] = new double[this.m];
+      mR[i] = new double[this.n];
+    }
+    cnt = 0;
+  }
+
+  public void reset() {
+    cnt = 0;
+  }
+
+  public void solve(Matrix a) {
+
+    assert a.rowSize() == m;
+    assert a.columnSize() == n;
+
+    double[] aRow = new double[n];
+    for (int i = 0; i < m; i++) {
+      Vector aRowV = a.getRow(i);
+      for (int j = 0; j < n; j++)
+        aRow[j] = aRowV.getQuick(j);
+      appendRow(aRow);
+    }
+  }
+
+  public boolean isFull() {
+    return cnt == m;
+  }
+
+  public int getM() {
+    return m;
+  }
+
+  public int getN() {
+    return n;
+  }
+
+  public int getCnt() {
+    return cnt;
+  }
+
+  public void adjust(int newM) {
+    if (newM == m)
+      return; // no adjustment is required.
+    if (newM < n)
+      throw new IllegalArgumentException("new m can't be less than n");
+    if (newM < cnt)
+      throw new IllegalArgumentException(
+          "new m can't be less than rows accumulated");
+    vQtRow = new double[newM];
+
+    // grow or shrink qt rows
+    if (newM > m) {
+      // grow qt rows
+      for (int i = 0; i < n; i++) {
+        mQt[i] = Arrays.copyOf(mQt[i], newM);
+        System.arraycopy(mQt[i], 0, mQt[i], newM - m, m);
+        Arrays.fill(mQt[i], 0, newM - m, 0);
+      }
+    } else {
+      // shrink qt rows
+      for (int i = 0; i < n; i++) {
+        mQt[i] = Arrays.copyOfRange(mQt[i], m - newM, m);
+      }
+    }
+
+    m = newM;
+
+  }
+
+  public void trim() {
+    adjust(cnt);
+  }
+
+  /**
+   * api for row-by-row addition
+   * 
+   * @param aRow
+   */
+  public void appendRow(double[] aRow) {
+    if (cnt >= m)
+      throw new RuntimeException(
+          "thin QR solver fed more rows than initialized for");
+    try {
+      // moving pointers around is inefficient but
+      // for the sanity's sake i am keeping it this way so i don't
+      // have to guess how R-tilde index maps to actual block index
+
+      Arrays.fill(vQtRow, 0);
+      vQtRow[m - cnt - 1] = 1;
+      int height = cnt > n ? n : cnt;
+      System.arraycopy(aRow, 0, vARow, 0, n);
+
+      if (height > 0) {
+        givens(vARow[0], getRRow(0)[0], cs);
+        applyGivensInPlace(cs[0], cs[1], vARow, getRRow(0), 0, n);
+        applyGivensInPlace(cs[0], cs[1], vQtRow, getQtRow(0), 0, m);
+      }
+
+      for (int i = 1; i < height; i++) {
+        givens(getRRow(i - 1)[i], getRRow(i)[i], cs);
+        applyGivensInPlace(cs[0], cs[1], getRRow(i - 1), getRRow(i), i,
+            n - i);
+        applyGivensInPlace(cs[0], cs[1], getQtRow(i - 1), getQtRow(i), 0,
+            m);
+      }
+      // push qt and r-tilde 1 row down
+      // just sqp the references to reduce GC churning
+      pushQtDown();
+      double[] swap = getQtRow(0);
+      setQtRow(0, vQtRow);
+      vQtRow = swap;
+
+      // triangular push -- obviously, less efficient than
+      // this is terribly inefficient. for each row we are basically
+      // moving ~ 2-4Mb of memory around.
+      // for (int i = m_n - 1; i > 0; i--) {
+      // // copy (i-1)th row into i-th row ignoring main diagonal item
+      // // which must be 0 now
+      // assert m_rTilde.getQuick(i - 1, i - 1) <= s_epsilon;
+      // for (int j = i; j < m_n; j++)
+      // m_rTilde.setQuick(i, j, m_rTilde.getQuick(i - 1, j));
+      // }
+      // for (int i = 0; i < m_n; i++)
+      // m_rTilde.setQuick(0, i, m_aRow[i]);
+      pushRDown();
+      swap = getRRow(0);
+      setRRow(0, vARow);
+      vARow = swap;
+
+    } finally {
+      cnt++;
+    }
+  }
+
+  private double[] getQtRow(int row) {
+
+    return mQt[(row += qtStartRow) >= n ? row - n : row];
+  }
+
+  private void setQtRow(int row, double[] qtRow) {
+    mQt[(row += qtStartRow) >= n ? row - n : row] = qtRow;
+  }
+
+  private void pushQtDown() {
+    qtStartRow = qtStartRow == 0 ? n - 1 : qtStartRow - 1;
+  }
+
+  private double[] getRRow(int row) {
+    return mR[(row += rStartRow) >= n ? row - n : row];
+  }
+
+  private void setRRow(int row, double[] rrow) {
+    mR[(row += rStartRow) >= n ? row - n : row] = rrow;
+  }
+
+  private void pushRDown() {
+    rStartRow = rStartRow == 0 ? n - 1 : rStartRow - 1;
+  }
+
+  // warning: both of these return actually n+1 rows with the last one being
+  // not interesting.
+  public UpperTriangular getRTilde() {
+    UpperTriangular packedR = new UpperTriangular(n);
+    for (int i = 0; i < n; i++)
+      packedR.assignRow(i, getRRow(i));
+    return packedR;
+  }
+
+  public double[][] getThinQtTilde() {
+    if (qtStartRow != 0) {
+      // rotate qt rows into place
+      double[][] qt = new double[n][]; // double[~500][], once per block, not
+                                         // a big deal.
+      System.arraycopy(mQt, qtStartRow, qt, 0, n - qtStartRow);
+      System.arraycopy(mQt, 0, qt, n - qtStartRow, qtStartRow);
+      return qt;
+    }
+    return mQt;
+  }
+
+  public static void applyGivensInPlace(double c, double s, double[] row1,
+      double[] row2, int offset, int len) {
+
+    int n = offset + len;
+    for (int j = offset; j < n; j++) {
+      double tau1 = row1[j];
+      double tau2 = row2[j];
+      row1[j] = c * tau1 - s * tau2;
+      row2[j] = s * tau1 + c * tau2;
+    }
+  }
+
+  public static void applyGivensInPlace(double c, double s, Vector row1,
+      Vector row2, int offset, int len) {
+
+    int n = offset + len;
+    for (int j = offset; j < n; j++) {
+      double tau1 = row1.getQuick(j);
+      double tau2 = row2.getQuick(j);
+      row1.setQuick(j, c * tau1 - s * tau2);
+      row2.setQuick(j, s * tau1 + c * tau2);
+    }
+  }
+
+  public static void applyGivensInPlace(double c, double s, int i, int k,
+      Matrix mx) {
+    int n = mx.columnSize();
+
+    for (int j = 0; j < n; j++) {
+      double tau1 = mx.get(i, j);
+      double tau2 = mx.get(k, j);
+      mx.set(i, j, c * tau1 - s * tau2);
+      mx.set(k, j, s * tau1 + c * tau2);
+    }
+  }
+
+  public static void fromRho(double rho, double[] csOut) {
+    if (rho == 1) {
+      csOut[0] = 0;
+      csOut[1] = 1;
+      return;
+    }
+    if (Math.abs(rho) < 1) {
+      csOut[1] = 2 * rho;
+      csOut[0] = Math.sqrt(1 - csOut[1] * csOut[1]);
+      return;
+    }
+    csOut[0] = 2 / rho;
+    csOut[1] = Math.sqrt(1 - csOut[0] * csOut[0]);
+  }
+
+  public static void givens(double a, double b, double[] csOut) {
+    if (b == 0) {
+      csOut[0] = 1;
+      csOut[1] = 0;
+      return;
+    }
+    if (Math.abs(b) > Math.abs(a)) {
+      double tau = -a / b;
+      csOut[1] = 1 / Math.sqrt(1 + tau * tau);
+      csOut[0] = csOut[1] * tau;
+    } else {
+      double tau = -b / a;
+      csOut[0] = 1 / Math.sqrt(1 + tau * tau);
+      csOut[1] = csOut[0] * tau;
+    }
+  }
+
+  public static double toRho(double c, double s) {
+    if (c == 0)
+      return 1;
+    if (Math.abs(s) < Math.abs(c))
+      return Math.signum(c) * s / 2;
+    else
+      return Math.signum(s) * 2 / c;
+  }
+
+  public static void mergeR(UpperTriangular r1, UpperTriangular r2) {
+    TriangularRowView r1Row = new TriangularRowView(r1);
+    TriangularRowView r2Row = new TriangularRowView(r2);
+    
+    int kp = r1Row.size();
+    assert kp == r2Row.size();
+
+    double[] cs = new double[2];
+
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1Row.setViewedRow(u).get(u), r2Row.setViewedRow(u - v).get(u),
+            cs);
+        applyGivensInPlace(cs[0], cs[1], r1Row, r2Row, u, kp - u);
+      }
+    }
+  }
+
+  public static void mergeR(double[][] r1, double[][] r2) {
+    int kp = r1[0].length;
+    assert kp == r2[0].length;
+
+    double[] cs = new double[2];
+
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1[u][u], r2[u - v][u], cs);
+        applyGivensInPlace(cs[0], cs[1], r1[u], r2[u - v], u, kp - u);
+      }
+    }
+
+  }
+
+  public static void mergeRonQ(UpperTriangular r1, UpperTriangular r2,
+      double[][] qt1, double[][] qt2) {
+    TriangularRowView r1Row = new TriangularRowView(r1);
+    TriangularRowView r2Row = new TriangularRowView(r2);
+    int kp = r1Row.size();
+    assert kp == r2Row.size();
+    assert kp == qt1.length;
+    assert kp == qt2.length;
+
+    int r = qt1[0].length;
+    assert qt2[0].length == r;
+
+    double[] cs = new double[2];
+
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1Row.setViewedRow(u).get(u), r2Row.setViewedRow(u - v).get(u),
+            cs);
+        applyGivensInPlace(cs[0], cs[1], r1Row, r2Row, u, kp - u);
+        applyGivensInPlace(cs[0], cs[1], qt1[u], qt2[u - v], 0, r);
+      }
+    }
+  }
+
+  public static void mergeRonQ(double[][] r1, double[][] r2, double[][] qt1,
+      double[][] qt2) {
+
+    int kp = r1[0].length;
+    assert kp == r2[0].length;
+    assert kp == qt1.length;
+    assert kp == qt2.length;
+
+    int r = qt1[0].length;
+    assert qt2[0].length == r;
+    double[] cs = new double[2];
+
+    // pairwise givens(a,b) so that a come off main diagonal in r1
+    // and bs come off u-th upper subdiagonal in r2.
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1[u][u], r2[u - v][u], cs);
+        applyGivensInPlace(cs[0], cs[1], r1[u], r2[u - v], u, kp - u);
+        applyGivensInPlace(cs[0], cs[1], qt1[u], qt2[u - v], 0, r);
+      }
+    }
+  }
+
+  // returns merged Q (which in this case is the qt1)
+  public static double[][] mergeQrUp(double[][] qt1, double[][] r1,
+      double[][] r2) {
+    int kp = qt1.length;
+    int r = qt1[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++)
+      qTilde[i] = new double[r];
+    mergeRonQ(r1, r2, qt1, qTilde);
+    return qt1;
+  }
+
+  // returns merged Q (which in this case is the qt1)
+  public static double[][] mergeQrUp(double[][] qt1, UpperTriangular r1,
+      UpperTriangular r2) {
+    int kp = qt1.length;
+    int r = qt1[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++)
+      qTilde[i] = new double[r];
+    mergeRonQ(r1, r2, qt1, qTilde);
+    return qt1;
+  }
+
+  public static double[][] mergeQrDown(double[][] r1, double[][] qt2,
+      double[][] r2) {
+    int kp = qt2.length;
+    int r = qt2[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++)
+      qTilde[i] = new double[r];
+    mergeRonQ(r1, r2, qTilde, qt2);
+    return qTilde;
+
+  }
+
+  public static double[][] mergeQrDown(UpperTriangular r1, double[][] qt2,
+      UpperTriangular r2) {
+    int kp = qt2.length;
+    int r = qt2[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++)
+      qTilde[i] = new double[r];
+    mergeRonQ(r1, r2, qTilde, qt2);
+    return qTilde;
+
+  }
+
+  public static double[][] computeQtHat(double[][] qt, int i,
+      Iterator<UpperTriangular> rIter) {
+    UpperTriangular rTilde = rIter.next();
+    for (int j = 1; j < i; j++)
+      mergeR(rTilde, rIter.next());
+    if (i > 0)
+      qt = mergeQrDown(rTilde, qt, rIter.next());
+    for (int j = i + 1; rIter.hasNext(); j++)
+      qt = mergeQrUp(qt, rTilde, rIter.next());
+    return qt;
+  }
+
+  // test helpers
+  public static boolean isOrthonormal(double[][] qt, boolean insufficientRank,
+      double epsilon) {
+    int n = qt.length;
+    int rank = 0;
+    for (int i = 0; i < n; i++) {
+      Vector ei = new DenseVector(qt[i], true);
+
+      double norm = ei.norm(2);
+
+      if (Math.abs(1 - norm) < epsilon)
+        rank++;
+      else if (Math.abs(norm) > epsilon)
+        return false; // not a rank deficiency, either
+
+      for (int j = 0; j <= i; j++) {
+        Vector ej = new DenseVector(qt[j], true);
+        double dot = ei.dot(ej);
+        if (!(Math.abs((i == j && rank > j ? 1 : 0) - dot) < epsilon))
+          return false;
+      }
+    }
+    return (!insufficientRank && rank == n) || (insufficientRank && rank < n);
+
+  }
+
+  public static boolean isOrthonormalBlocked(Iterable<double[][]> qtHats,
+      boolean insufficientRank, double epsilon) {
+    int n = qtHats.iterator().next().length;
+    int rank = 0;
+    for (int i = 0; i < n; i++) {
+      List<Vector> ei = new ArrayList<Vector>();
+      // Vector e_i=new DenseVector (qt[i],true);
+      for (double[][] qtHat : qtHats)
+        ei.add(new DenseVector(qtHat[i], true));
+
+      double norm = 0;
+      for (Vector v : ei)
+        norm += v.dot(v);
+      norm = Math.sqrt(norm);
+      if (Math.abs(1 - norm) < epsilon)
+        rank++;
+      else if (Math.abs(norm) > epsilon)
+        return false; // not a rank deficiency, either
+
+      for (int j = 0; j <= i; j++) {
+        List<Vector> ej = new ArrayList<Vector>();
+        for (double[][] qtHat : qtHats)
+          ej.add(new DenseVector(qtHat[j], true));
+
+        // Vector e_j = new DenseVector ( qt[j], true);
+        double dot = 0;
+        for (int k = 0; k < ei.size(); k++)
+          dot += ei.get(k).dot(ej.get(k));
+        if (!(Math.abs((i == j && rank > j ? 1 : 0) - dot) < epsilon))
+          return false;
+      }
+    }
+    return (!insufficientRank && rank == n) || (insufficientRank && rank < n);
+
+  }
+
+  private static class TriangularRowView extends AbstractVector {
+    private UpperTriangular viewed;
+    private int rowNum;
+
+    public TriangularRowView(UpperTriangular viewed) {
+      super(viewed.columnSize());
+      this.viewed = viewed;
+
+    }
+
+    TriangularRowView setViewedRow(int row) {
+      rowNum = row;
+      return this;
+    }
+
+    @Override
+    public boolean isDense() {
+      return true;
+    }
+
+    @Override
+    public boolean isSequentialAccess() {
+      return false;
+    }
+
+    @Override
+    public Iterator<Element> iterator() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Iterator<Element> iterateNonZero() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getQuick(int index) {
+      return viewed.getQuick(rowNum, index);
+    }
+
+    @Override
+    public Vector like() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setQuick(int index, double value) {
+      viewed.setQuick(rowNum, index, value);
+
+    }
+
+    @Override
+    public int getNumNondefaultElements() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected Matrix matrixLike(int rows, int columns) {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+  public static class DeepCopyUTIterator implements Iterator<UpperTriangular> {
+
+    private Iterator<UpperTriangular> delegate;
+
+    public DeepCopyUTIterator(Iterator<UpperTriangular> del) {
+      super();
+      delegate = del;
+    }
+
+    public boolean hasNext() {
+      return delegate.hasNext();
+    }
+
+    public UpperTriangular next() {
+
+      return new UpperTriangular(delegate.next());
+    }
+
+    public void remove() {
+      delegate.remove();
+    }
+
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * simplistic implementation for Omega matrix in Stochastic SVD method
+ * 
+ */
+public class Omega {
+
+  private long seed;
+  private Random rnd = new Random();
+  private int kp;
+
+  public Omega(long seed, int k, int p) {
+    super();
+    this.seed = seed;
+    kp = k + p;
+
+  }
+
+  public void accumDots(int aIndex, double aElement, double[] yRow) {
+    rnd.setSeed(getOmegaRowSeed(aIndex, seed, rnd));
+    for (int i = 0; i < kp; i++)
+      yRow[i] += rnd.nextGaussian() * aElement;
+  }
+
+  /**
+   * compute YRow=ARow*Omega.
+   * 
+   * @param aRow
+   *          row of matrix A (size n)
+   * @param yRow
+   *          row of matrix Y (result) must be pre-allocated to size of (k+p)
+   */
+  public void computeYRow(Vector aRow, double[] yRow) {
+    assert yRow.length == kp;
+
+    Arrays.fill(yRow, 0);
+    if (aRow instanceof SequentialAccessSparseVector) {
+      int j = 0;
+      for (Element el : aRow) {
+        accumDots(j, el.get(), yRow);
+        j++;
+      }
+      
+    } else {
+      int n = aRow.size();
+      for (int j = 0; j < n; j++)
+        accumDots(j, aRow.getQuick(j), yRow);
+    }
+
+  }
+
+  public long getOmegaRowSeed(int omegaRow, long omegaSeed, Random rnd) {
+    rnd.setSeed(omegaSeed);
+    long rowSeed = rnd.nextLong();
+    rnd.setSeed(rowSeed ^ omegaRow);
+    return rowSeed ^ rnd.nextLong();
+
+  }
+
+  public static long murmur64(byte[] val, int offset, int len, long seed) {
+
+    long m = 0xc6a4a7935bd1e995L;
+    int r = 47;
+    long h = seed ^ (len * m);
+
+    int lt = len >>> 3;
+    for (int i = 0; i < lt; i++, offset += 8) {
+      long k = 0;
+      for (int j = 0; j < 8; j++) {
+        k <<= 8;
+        k |= val[offset + j] & 0xff;
+      }
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h ^= k;
+      h *= m;
+    }
+    long k = 0;
+
+    if (offset < len) {
+      for (; offset < len; offset++) {
+        k <<= 8;
+        k |= val[offset] & 0xff;
+      }
+      h ^= k;
+      h *= m;
+    }
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+    return h;
+
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/PartialRowEmitter.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+
+import org.apache.mahout.math.Vector;
+
+/**
+ * This is part of SSVD prototype code
+ *
+ */
+public interface PartialRowEmitter {
+
+  void emitRow(int rowNum, Vector row) throws IOException;
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Compute first level of QHat-transpose blocks.
+ * 
+ * See Mahout-376 woking notes for details.
+ * 
+ * 
+ */
+@SuppressWarnings("deprecation")
+public class QJob {
+
+  public static final String PROP_OMEGA_SEED = "ssvd.omegaseed";
+  public static final String PROP_K = "ssvd.k";
+  public static final String PROP_P = "ssvd.p";
+  public static final String PROP_AROWBLOCK_SIZE = "ssvd.arowblock.size";
+
+  public static final String OUTPUT_R = "R";
+  public static final String OUTPUT_QHAT = "QHat";
+  // public static final String OUTPUT_Q="Q";
+  public static final String OUTPUT_BT = "Bt";
+
+  public static class QJobKeyWritable implements
+      WritableComparable<QJobKeyWritable> {
+
+    private int taskId;
+    private int taskRowOrdinal;
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      taskId = in.readInt();
+      taskRowOrdinal = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(taskId);
+      out.writeInt(taskRowOrdinal);
+    }
+
+    @Override
+    public int compareTo(QJobKeyWritable o) {
+      if (taskId < o.taskId)
+        return -1;
+      else if (taskId > o.taskId)
+        return 1;
+      if (taskRowOrdinal < o.taskRowOrdinal)
+        return -1;
+      else if (taskRowOrdinal > o.taskRowOrdinal)
+        return 1;
+      return 0;
+    }
+
+  }
+
+  public static class QMapper extends
+      Mapper<Writable, VectorWritable, QJobKeyWritable, VectorWritable> {
+
+    private int kp;
+    private Omega omega;
+    private List<double[]> yLookahead;
+    private GivensThinSolver qSolver;
+    private int blockCnt;
+    // private int m_reducerCount;
+    private int r;
+    private DenseBlockWritable value = new DenseBlockWritable();
+    private QJobKeyWritable key = new QJobKeyWritable();
+    private IntWritable tempKey = new IntWritable();
+    private MultipleOutputs outputs;
+    private LinkedList<Closeable> closeables = new LinkedList<Closeable>();
+    private SequenceFile.Writer tempQw;
+    private Path tempQPath;
+    private List<UpperTriangular> rSubseq = new ArrayList<UpperTriangular>();
+
+    private void flushSolver(Context context) throws IOException,
+        InterruptedException {
+      UpperTriangular r = qSolver.getRTilde();
+      double[][] qt = qSolver.getThinQtTilde();
+
+      rSubseq.add(r);
+
+      value.setBlock(qt);
+      getTempQw(context).append(tempKey, value); // this probably should be
+                                                     // a sparse row matrix,
+      // but compressor should get it for disk and in memory we want it
+      // dense anyway, sparse random implementations would be
+      // a mostly a memory management disaster consisting of rehashes and GC
+      // thrashing. (IMHO)
+      value.setBlock(null);
+      qSolver.reset();
+    }
+
+    // second pass to run a modified version of computeQHatSequence.
+    @SuppressWarnings("unchecked")
+    private void flushQBlocks(Context ctx) throws IOException,
+        InterruptedException {
+      if (blockCnt == 1) {
+        // only one block, no temp file, no second pass. should be the default
+        // mode
+        // for efficiency in most cases. Sure mapper should be able to load
+        // the entire split in memory -- and we don't require even that.
+        value.setBlock(qSolver.getThinQtTilde());
+        outputs.getCollector(OUTPUT_QHAT, null).collect(key, value);
+        outputs.getCollector(OUTPUT_R, null).collect(
+            key,
+            new VectorWritable(new DenseVector(qSolver.getRTilde().getData(),
+                true)));
+
+      } else
+        secondPass(ctx);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void secondPass(Context ctx) throws IOException,
+        InterruptedException {
+      qSolver = null; // release mem
+      FileSystem localFs = FileSystem.getLocal(ctx.getConfiguration());
+      SequenceFile.Reader tempQr = new SequenceFile.Reader(localFs,
+          tempQPath, ctx.getConfiguration());
+      closeables.addFirst(tempQr);
+      int qCnt = 0;
+      while (tempQr.next(tempKey, value)) {
+        value
+            .setBlock(GivensThinSolver.computeQtHat(value.getBlock(), qCnt,
+                new GivensThinSolver.DeepCopyUTIterator(rSubseq.iterator())));
+        if (qCnt == 1) // just merge r[0] <- r[1] so it doesn't have to repeat
+                       // in subsequent computeQHat iterators
+          GivensThinSolver.mergeR(rSubseq.get(0), rSubseq.remove(1));
+
+        else
+          qCnt++;
+        outputs.getCollector(OUTPUT_QHAT, null).collect(key, value);
+      }
+
+      assert rSubseq.size() == 1;
+
+      // m_value.setR(m_rSubseq.get(0));
+      outputs.getCollector(OUTPUT_R, null)
+          .collect(
+              key,
+              new VectorWritable(new DenseVector(rSubseq.get(0).getData(),
+                  true)));
+
+    }
+
+    @Override
+    protected void map(Writable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      double[] yRow = null;
+      if (yLookahead.size() == kp) {
+        if (qSolver.isFull()) {
+
+          flushSolver(context);
+          blockCnt++;
+
+        }
+        yRow = yLookahead.remove(0);
+
+        qSolver.appendRow(yRow);
+      } else
+        yRow = new double[kp];
+      omega.computeYRow(value.get(), yRow);
+      yLookahead.add(yRow);
+    }
+
+    @Override
+    protected void setup(final Context context) throws IOException,
+        InterruptedException {
+
+      int k = Integer.parseInt(context.getConfiguration().get(PROP_K));
+      int p = Integer.parseInt(context.getConfiguration().get(PROP_P));
+      kp = k + p;
+      long omegaSeed = Long.parseLong(context.getConfiguration().get(
+          PROP_OMEGA_SEED));
+      r = Integer.parseInt(context.getConfiguration()
+          .get(PROP_AROWBLOCK_SIZE));
+      omega = new Omega(omegaSeed, k, p);
+      yLookahead = new ArrayList<double[]>(kp);
+      qSolver = new GivensThinSolver(r, kp);
+      outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
+      closeables.addFirst(new Closeable() {
+        @Override
+        public void close() throws IOException {
+          outputs.close();
+        }
+      });
+
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+        InterruptedException {
+      try {
+        if (qSolver == null && yLookahead.size() == 0)
+          return;
+        if (qSolver == null)
+          qSolver = new GivensThinSolver(yLookahead.size(), kp);
+        // grow q solver up if necessary
+
+        qSolver.adjust(qSolver.getCnt() + yLookahead.size());
+        while (yLookahead.size() > 0) {
+
+          qSolver.appendRow(yLookahead.remove(0));
+
+        }
+        assert qSolver.isFull();
+        if (++blockCnt > 1) {
+          flushSolver(context);
+          assert tempQw != null;
+          closeables.remove(tempQw);
+          tempQw.close();
+        }
+        flushQBlocks(context);
+
+      } finally {
+        IOUtils.close(closeables);
+      }
+
+    }
+
+    private SequenceFile.Writer getTempQw(Context context) throws IOException {
+      if (tempQw == null) {
+        // temporary Q output
+        // hopefully will not exceed size of IO cache in which case it is only
+        // good since it
+        // is going to be maanged by kernel, not java GC. And if IO cache is not
+        // good enough,
+        // then at least it is always sequential.
+        String taskTmpDir = System.getProperty("java.io.tmpdir");
+        FileSystem localFs = FileSystem.getLocal(context.getConfiguration());
+        tempQPath = new Path(new Path(taskTmpDir), "q-temp.seq");
+        tempQw = SequenceFile.createWriter(localFs,
+            context.getConfiguration(), tempQPath, IntWritable.class,
+            DenseBlockWritable.class, CompressionType.BLOCK);
+        closeables.addFirst(tempQw);
+        closeables.addFirst(new IOUtils.DeleteFileOnClose(new File(tempQw
+            .toString())));
+
+      }
+      return tempQw;
+    }
+  }
+
+  public static void run(Configuration conf, Path[] inputPaths,
+      Path outputPath, int aBlockRows, int minSplitSize, int k, int p,
+      long seed, int numReduceTasks) throws ClassNotFoundException,
+      InterruptedException, IOException {
+
+    JobConf oldApiJob = new JobConf(conf);
+    MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_QHAT,
+        org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+        QJobKeyWritable.class, DenseBlockWritable.class);
+    MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_R,
+        org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+        QJobKeyWritable.class, VectorWritable.class);
+
+    Job job = new Job(oldApiJob);
+    job.setJobName("Q-job");
+    job.setJarByClass(QJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPaths);
+    if (minSplitSize > 0)
+      SequenceFileInputFormat.setMinInputSplitSize(job, minSplitSize);
+
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    SequenceFileOutputFormat.setCompressOutput(job, true);
+    SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+        CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(QJobKeyWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(QJobKeyWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(QMapper.class);
+
+    job.getConfiguration().setInt(PROP_AROWBLOCK_SIZE, aBlockRows);
+    job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
+    job.getConfiguration().setInt(PROP_K, k);
+    job.getConfiguration().setInt(PROP_P, p);
+
+    // number of reduce tasks doesn't matter. we don't actually
+    // send anything to reducers. in fact, the only reason
+    // we need to configure reduce step is so that combiners can fire.
+    // so reduce here is purely symbolic.
+    job.setNumReduceTasks(0 /* numReduceTasks */);
+
+    job.submit();
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful())
+      throw new IOException("Q job unsuccessful.");
+
+  }
+
+  public static enum QJobCntEnum {
+    NUM_Q_BLOCKS;
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,126 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Mahout CLI adapter for SSVDSolver
+ * 
+ * 
+ */
+public class SSVDCli extends AbstractJob {
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption("rank", "k", "decomposition rank", true);
+    addOption("oversampling", "p", "oversampling", true);
+    addOption("blockHeight", "r", "Y block height (must be > (k+p))", true);
+    addOption("minSplitSize", "s", "minimum split size", "-1");
+    addOption("computeU", "U", "compute U (true/false)", "true");
+    addOption("uHalfSigma", "uhs", "Compute U as UHat=U x pow(Sigma,0.5)",
+        "false");
+    addOption("computeV", "V", "compute V (true/false)", "true");
+    addOption("vHalfSigma", "vhs", "compute V as VHat= V x pow(Sigma,0.5)",
+        "false");
+    addOption("reduceTasks", "t", "number of reduce tasks (where applicable)",
+        "1");
+
+    Map<String, String> pargs = parseArguments(args);
+    if (pargs == null)
+      return -1;
+
+    String input = pargs.get("--input");
+    String output = pargs.get("--output");
+    String tempDir = pargs.get("--tempDir");
+    int k = Integer.parseInt(pargs.get("--rank"));
+    int p = Integer.parseInt(pargs.get("--oversampling"));
+    int r = Integer.parseInt(pargs.get("--blockHeight"));
+    int minSplitSize = Integer.parseInt(pargs.get("--minSplitSize"));
+    boolean computeU = Boolean.parseBoolean(pargs.get("--computeU"));
+    boolean computeV = Boolean.parseBoolean(pargs.get("--computeV"));
+    boolean cUHalfSigma = Boolean.parseBoolean(pargs.get("--uHalfSigma"));
+    boolean cVHalfSigma = Boolean.parseBoolean(pargs.get("--vHalfSigma"));
+    int reduceTasks = Integer.parseInt(pargs.get("--reduceTasks"));
+
+    Configuration conf = getConf();
+    if (conf == null)
+      throw new IOException("No Hadoop configuration present");
+
+    SSVDSolver solver = new SSVDSolver(conf, new Path[] { new Path(input) },
+        new Path(tempDir), r, k, p, reduceTasks);
+    solver.setMinSplitSize(minSplitSize);
+    solver.setComputeU(computeU);
+    solver.setComputeV(computeV);
+    solver.setcUHalfSigma(cUHalfSigma);
+    solver.setcVHalfSigma(cVHalfSigma);
+
+    solver.run();
+
+    // housekeeping
+    FileSystem fs = FileSystem.get(conf);
+
+    Path outPath = new Path(output);
+    fs.mkdirs(outPath);
+
+    SequenceFile.Writer sigmaW = SequenceFile.createWriter(fs, conf, new Path(
+        outPath, "sigma"), NullWritable.class, VectorWritable.class);
+    try {
+      VectorWritable sValues = new VectorWritable(new DenseVector(
+          Arrays.copyOf(solver.getSingularValues(), k), true));
+      sigmaW.append(NullWritable.get(), sValues);
+
+    } finally {
+      sigmaW.close();
+    }
+
+    if (computeU) {
+      FileStatus[] uFiles = fs.globStatus(new Path(solver.getUPath()));
+      if (uFiles != null)
+        for (FileStatus uf : uFiles)
+          fs.rename(uf.getPath(), outPath);
+    }
+    if (computeV) {
+      FileStatus[] vFiles = fs.globStatus(new Path(solver.getVPath()));
+      if (vFiles != null)
+        for (FileStatus vf : vFiles)
+          fs.rename(vf.getPath(), outPath);
+
+    }
+    return 0;
+  }
+
+  public static int main(String[] args) throws Exception {
+    return ToolRunner.run(new SSVDCli(), args);
+  }
+
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java?rev=1078694&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototype.java Mon Mar  7 06:34:12 2011
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.SingularValueDecomposition;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.ssvd.EigenSolverWrapper;
+
+/**
+ * SSVD protoptype: non-MR concept verification for Givens QR & SSVD basic
+ * algorithms
+ * 
+ * @author Dmitriy
+ * 
+ */
+public class SSVDPrototype {
+
+
+  private Omega omega;
+  private int kp; // k+p
+  private GivensThinSolver qSolver;
+  private double[] yRow;
+  private int cnt;
+  private int blckCnt;
+  private int r;
+  private List<UpperTriangular> rBlocks = new ArrayList<UpperTriangular>();
+  private List<double[][]> qtBlocks = new ArrayList<double[][]>();
+  private List<double[]> yLookahead;
+
+  public SSVDPrototype(long seed, int kp, int r) {
+    super();
+    this.kp = kp;
+    omega = new Omega(seed, kp / 2, kp - (kp / 2));
+    yRow = new double[kp];
+    // m_yRowV = new DenseVector(m_yRow,true);
+    this.r = r;
+    yLookahead = new ArrayList<double[]>(kp);
+  }
+
+  void firstPass(int aRowId, Vector aRow) throws IOException {
+
+    omega.computeYRow(aRow, yRow);
+
+    yLookahead.add(yRow.clone()); // bad for GC but it's just a prototype,
+                                      // hey. in real thing we'll rotate usage
+                                      // of y buff
+
+    while (yLookahead.size() > kp) {
+
+      if (qSolver == null)
+        qSolver = new GivensThinSolver(r, kp);
+
+      qSolver.appendRow(yLookahead.remove(0));
+      if (qSolver.isFull()) {
+        UpperTriangular r = qSolver.getRTilde();
+        double[][] qt = qSolver.getThinQtTilde();
+        qSolver = null;
+        qtBlocks.add(qt);
+        rBlocks.add(r);
+      }
+
+    }
+    cnt++;
+  }
+
+  void finishFirstPass() {
+
+    if (qSolver == null && yLookahead.size() == 0)
+      return;
+    if (qSolver == null)
+      qSolver = new GivensThinSolver(yLookahead.size(), kp);
+    // grow q solver up if necessary
+
+    qSolver.adjust(qSolver.getCnt() + yLookahead.size());
+    while (yLookahead.size() > 0) {
+
+      qSolver.appendRow(yLookahead.remove(0));
+      if (qSolver.isFull()) {
+        UpperTriangular r = qSolver.getRTilde();
+        double[][] qt = qSolver.getThinQtTilde();
+        qSolver = null;
+        qtBlocks.add(qt);
+        rBlocks.add(r);
+      }
+
+    }
+
+    // simulate reducers -- produce qHats
+    for (int i = 0; i < rBlocks.size(); i++)
+      qtBlocks.set(i, GivensThinSolver.computeQtHat(qtBlocks.get(i), i,
+          new DeepCopyIterator(rBlocks.listIterator())));
+    cnt = 0;
+    blckCnt = 0;
+  }
+
+  void secondPass(int aRowId, Vector aRow, PartialRowEmitter btEmitter)
+    throws IOException {
+    int n = aRow.size();
+    double[][] qtHat = qtBlocks.get(blckCnt);
+
+    int r = qtHat[0].length;
+    int qRowBlckIndex = r - cnt - 1; // <-- reverse order since we fed A in
+                                       // reverse
+    double[] qRow = new double[kp];
+    for (int i = 0; i < kp; i++)
+      qRow[i] = qtHat[i][qRowBlckIndex];
+    Vector qRowV = new DenseVector(qRow, true);
+
+    if (++cnt == r) {
+      blckCnt++;
+      cnt = 0;
+    }
+
+    for (int i = 0; i < n; i++)
+      btEmitter.emitRow(i, qRowV.times(aRow.getQuick(i)));
+
+  }
+
+  private static class DeepCopyIterator implements Iterator<UpperTriangular> {
+
+    private Iterator<UpperTriangular> delegate;
+
+    public DeepCopyIterator(Iterator<UpperTriangular> del) {
+      super();
+      delegate = del;
+    }
+
+    public boolean hasNext() {
+      return delegate.hasNext();
+    }
+
+    public UpperTriangular next() {
+
+      return new UpperTriangular(delegate.next());
+    }
+
+    public void remove() {
+      delegate.remove();
+    }
+
+  }
+
+  public static void testThinQr(int dims, int kp, final long rndSeed)
+    throws Exception {
+
+    DenseMatrix mx = new DenseMatrix(dims << 2, dims);
+    // mx.assign(new UnaryFunction() {
+    //
+    // Random m_rnd = new Random(rndSeed);
+    //
+    // @Override
+    // public double apply(double arg0) {
+    // return m_rnd.nextDouble()*1000;
+    // }
+    // });
+
+    Random rnd = new Random();
+    for (int i = 0; i < mx.rowSize(); i++)
+      for (int j = 0; j < mx.columnSize(); j++)
+        mx.set(i, j, rnd.nextDouble() * 1000);
+
+    mx.setQuick(0, 0, 1);
+    mx.setQuick(0, 1, 2);
+    mx.setQuick(0, 2, 3);
+    mx.setQuick(1, 0, 4);
+    mx.setQuick(1, 1, 5);
+    mx.setQuick(1, 2, 6);
+    mx.setQuick(2, 0, 7);
+    mx.setQuick(2, 1, 8);
+    mx.setQuick(2, 2, 9);
+
+    SingularValueDecomposition svd2 = new SingularValueDecomposition(mx);
+    double[] svaluesControl = svd2.getSingularValues();
+
+    for (int i = 0; i < kp; i++)
+      System.out.printf("%.3e ", svaluesControl[i]);
+    System.out.println();
+
+    int m = mx.rowSize(); /* ,n=mx.columnSize(); */
+
+    long seed = new Random().nextLong();
+
+    final Map<Integer, Vector> btRows = new HashMap<Integer, Vector>();
+
+    PartialRowEmitter btEmitter = new PartialRowEmitter() {
+      @Override
+      public void emitRow(int rowNum, Vector row) throws IOException {
+        Vector btRow = btRows.get(rowNum);
+        if (btRow != null)
+          row.addTo(btRow);
+        btRows.put(rowNum, btRow == null ? new DenseVector(row) : btRow);
+      }
+    };
+
+    SSVDPrototype mapperSimulation = new SSVDPrototype(seed, kp, 3000);
+    for (int i = 0; i < m; i++)
+      mapperSimulation.firstPass(i, mx.getRow(i));
+
+    mapperSimulation.finishFirstPass();
+
+    for (int i = 0; i < m; i++)
+      mapperSimulation.secondPass(i, mx.getRow(i), btEmitter);
+
+    // LocalSSVDTest.assertOrthonormality(mapperSimulation.m_qt.transpose(),
+    // false,1e-10);
+
+    // reconstruct bbt
+    final Map<Integer, Vector> bbt = new HashMap<Integer, Vector>();
+    PartialRowEmitter bbtEmitter = new PartialRowEmitter() {
+
+      @Override
+      public void emitRow(int rowNum, Vector row) throws IOException {
+        Vector bbtRow = bbt.get(rowNum);
+        if (bbtRow != null)
+          row.addTo(bbtRow);
+        bbt.put(rowNum, bbtRow == null ? new DenseVector(row) : bbtRow);
+      }
+    };
+
+    for (Map.Entry<Integer, Vector> btRowEntry : btRows.entrySet()) {
+      Vector btRow = btRowEntry.getValue();
+      assert btRow.size() == kp;
+      for (int i = 0; i < kp; i++)
+        bbtEmitter.emitRow(i, btRow.times(btRow.getQuick(i)));
+    }
+
+    double[][] bbtValues = new double[kp][];
+    for (int i = 0; i < kp; i++) {
+      bbtValues[i] = new double[kp];
+      Vector bbtRow = bbt.get(i);
+      for (int j = 0; j < kp; j++)
+        bbtValues[i][j] = bbtRow.getQuick(j);
+    }
+
+    EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(bbtValues);
+    double[] eigenva2 = eigenWrapper.getEigenValues();
+    double[] svalues = new double[kp];
+    for (int i = 0; i < kp; i++)
+      svalues[i] = Math.sqrt(eigenva2[i]); // sqrt?
+
+    for (int i = 0; i < kp; i++)
+      System.out.printf("%.3e ", svalues[i]);
+    System.out.println();
+
+  }
+
+  public static void testBlockQrWithSSVD(int dims, int kp, int r,
+      final long rndSeed) throws Exception {
+
+    DenseMatrix mx = new DenseMatrix(dims << 2, dims);
+    // mx.assign(new UnaryFunction() {
+    //
+    // Random m_rnd = new Random(rndSeed);
+    //
+    // @Override
+    // public double apply(double arg0) {
+    // return (m_rnd.nextDouble()-0.5)*1000;
+    // }
+    // });
+
+    Random rnd = new Random();
+    for (int i = 0; i < mx.rowSize(); i++)
+      for (int j = 0; j < mx.columnSize(); j++)
+        mx.set(i, j, (rnd.nextDouble() - 0.5) * 1000);
+    mx.setQuick(0, 0, 1);
+    mx.setQuick(0, 1, 2);
+    mx.setQuick(0, 2, 3);
+    mx.setQuick(1, 0, 4);
+    mx.setQuick(1, 1, 5);
+    mx.setQuick(1, 2, 6);
+    mx.setQuick(2, 0, 7);
+    mx.setQuick(2, 1, 8);
+    mx.setQuick(2, 2, 9);
+
+    SingularValueDecomposition svd2 = new SingularValueDecomposition(mx);
+    double[] svaluesControl = svd2.getSingularValues();
+
+    for (int i = 0; i < kp; i++)
+      System.out.printf("%e ", svaluesControl[i]);
+    System.out.println();
+
+    int m = mx.rowSize(); /* ,n=mx.columnSize(); */
+
+    final Map<Integer, Vector> btRows = new HashMap<Integer, Vector>();
+
+    PartialRowEmitter btEmitter = new PartialRowEmitter() {
+      @Override
+      public void emitRow(int rowNum, Vector row) throws IOException {
+        Vector btRow = btRows.get(rowNum);
+        if (btRow != null)
+          row.addTo(btRow);
+        btRows.put(rowNum, btRow == null ? new DenseVector(row) : btRow);
+      }
+    };
+
+    SSVDPrototype mapperSimulation = new SSVDPrototype(rndSeed, kp, r);
+    for (int i = 0; i < m; i++)
+      mapperSimulation.firstPass(i, mx.getRow(i));
+
+    mapperSimulation.finishFirstPass();
+
+    for (int i = 0; i < m; i++)
+      mapperSimulation.secondPass(i, mx.getRow(i), btEmitter);
+
+    // LocalSSVDTest.assertOrthonormality(mapperSimulation.m_qt.transpose(),
+    // false,1e-10);
+
+    // reconstruct bbt
+    final Map<Integer, Vector> bbt = new HashMap<Integer, Vector>();
+    PartialRowEmitter bbtEmitter = new PartialRowEmitter() {
+
+      @Override
+      public void emitRow(int rowNum, Vector row) throws IOException {
+        Vector bbtRow = bbt.get(rowNum);
+        if (bbtRow != null)
+          row.addTo(bbtRow);
+        bbt.put(rowNum, bbtRow == null ? new DenseVector(row) : bbtRow);
+      }
+    };
+
+    for (Map.Entry<Integer, Vector> btRowEntry : btRows.entrySet()) {
+      Vector btRow = btRowEntry.getValue();
+      assert btRow.size() == kp;
+      for (int i = 0; i < kp; i++)
+        bbtEmitter.emitRow(i, btRow.times(btRow.getQuick(i)));
+    }
+
+    double[][] bbtValues = new double[kp][];
+    for (int i = 0; i < kp; i++) {
+      bbtValues[i] = new double[kp];
+      Vector bbtRow = bbt.get(i);
+      for (int j = 0; j < kp; j++)
+        bbtValues[i][j] = bbtRow.getQuick(j);
+    }
+
+    EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(bbtValues);
+
+    double[] eigenva2 = eigenWrapper.getEigenValues();
+    double[] svalues = new double[kp];
+    for (int i = 0; i < kp; i++)
+      svalues[i] = Math.sqrt(eigenva2[i]); // sqrt?
+
+    for (int i = 0; i < kp; i++)
+      System.out.printf("%e ", svalues[i]);
+    System.out.println();
+  }
+
+  public static void main(String[] args) throws Exception {
+    // testThinQr();
+    long seed = new Random().nextLong();
+    testBlockQrWithSSVD(200, 200, 800, seed);
+    testBlockQrWithSSVD(200, 20, 800, seed);
+    testBlockQrWithSSVD(200, 20, 850, seed); // test trimming
+    testBlockQrWithSSVD(200, 20, 90, seed);
+    testBlockQrWithSSVD(200, 20, 99, seed);
+  }
+
+}