You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:20 UTC

[18/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java
new file mode 100644
index 0000000..b7f5b94
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.PlusMult;
+
+/**
+ * block that supports accumulating rows and their sums , suitable for combiner
+ * and reducers of multiplication jobs.
+ */
+public class SparseRowBlockWritable implements Writable {
+
+  private int[] rowIndices;
+  private Vector[] rows;
+  private int numRows;
+
+  public SparseRowBlockWritable() {
+    this(10);
+  }
+
+  public SparseRowBlockWritable(int initialCapacity) {
+    rowIndices = new int[initialCapacity];
+    rows = new Vector[initialCapacity];
+  }
+
+  public int[] getRowIndices() {
+    return rowIndices;
+  }
+
+  public Vector[] getRows() {
+    return rows;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    numRows = Varint.readUnsignedVarInt(in);
+    if (rows == null || rows.length < numRows) {
+      rows = new Vector[numRows];
+      rowIndices = new int[numRows];
+    }
+    VectorWritable vw = new VectorWritable();
+    for (int i = 0; i < numRows; i++) {
+      rowIndices[i] = Varint.readUnsignedVarInt(in);
+      vw.readFields(in);
+      rows[i] = vw.get().clone();
+    }
+
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeUnsignedVarInt(numRows, out);
+    VectorWritable vw = new VectorWritable();
+    for (int i = 0; i < numRows; i++) {
+      Varint.writeUnsignedVarInt(rowIndices[i], out);
+      vw.set(rows[i]);
+      vw.write(out);
+    }
+  }
+
+  public void plusRow(int index, Vector row) {
+    /*
+     * often accumulation goes in row-increasing order, so check for this to
+     * avoid binary search (another log Height multiplier).
+     */
+
+    int pos =
+      numRows == 0 || rowIndices[numRows - 1] < index ? -numRows - 1 : Arrays
+        .binarySearch(rowIndices, 0, numRows, index);
+    if (pos >= 0) {
+      rows[pos].assign(row, PlusMult.plusMult(1));
+    } else {
+      insertIntoPos(-pos - 1, index, row);
+    }
+  }
+
+  private void insertIntoPos(int pos, int rowIndex, Vector row) {
+    // reallocate if needed
+    if (numRows == rows.length) {
+      rows = Arrays.copyOf(rows, numRows + 1 << 1);
+      rowIndices = Arrays.copyOf(rowIndices, numRows + 1 << 1);
+    }
+    // make a hole if needed
+    System.arraycopy(rows, pos, rows, pos + 1, numRows - pos);
+    System.arraycopy(rowIndices, pos, rowIndices, pos + 1, numRows - pos);
+    // put
+    rowIndices[pos] = rowIndex;
+    rows[pos] = row.clone();
+    numRows++;
+  }
+
+  /**
+   * pluses one block into another. Use it for accumulation of partial products in
+   * combiners and reducers.
+   * 
+   * @param bOther
+   *          block to add
+   */
+  public void plusBlock(SparseRowBlockWritable bOther) {
+    /*
+     * since we maintained row indices in a sorted order, we can run sort merge
+     * to expedite this operation
+     */
+    int i = 0;
+    int j = 0;
+    while (i < numRows && j < bOther.numRows) {
+      while (i < numRows && rowIndices[i] < bOther.rowIndices[j]) {
+        i++;
+      }
+      if (i < numRows) {
+        if (rowIndices[i] == bOther.rowIndices[j]) {
+          rows[i].assign(bOther.rows[j], PlusMult.plusMult(1));
+        } else {
+          // insert into i-th position
+          insertIntoPos(i, bOther.rowIndices[j], bOther.rows[j]);
+        }
+        // increment in either case
+        i++;
+        j++;
+      }
+    }
+    for (; j < bOther.numRows; j++) {
+      insertIntoPos(numRows, bOther.rowIndices[j], bOther.rows[j]);
+    }
+  }
+
+  public int getNumRows() {
+    return numRows;
+  }
+
+  public void clear() {
+    numRows = 0;
+    Arrays.fill(rows, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java
new file mode 100644
index 0000000..7caeb4a
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.Varint;
+
+/**
+ * a key for vectors allowing to identify them by their coordinates in original
+ * split of A.
+ * 
+ * We assume all passes over A results in the same splits, thus, we can always
+ * prepare side files that come into contact with A, sp that they are sorted and
+ * partitioned same way.
+ * <P>
+ * 
+ * Hashcode is defined the way that all records of the same split go to the same
+ * reducer.
+ * <P>
+ * 
+ * In addition, we are defining a grouping comparator allowing group one split
+ * into the same reducer group.
+ * <P>
+ * 
+ */
+public class SplitPartitionedWritable implements
+    WritableComparable<SplitPartitionedWritable> {
+
+  private int taskId;
+  private long taskItemOrdinal;
+
+  public SplitPartitionedWritable(Mapper<?, ?, ?, ?>.Context mapperContext) {
+    // this is basically a split # if i understand it right
+    taskId = mapperContext.getTaskAttemptID().getTaskID().getId();
+  }
+
+  public SplitPartitionedWritable() {
+  }
+
+  public int getTaskId() {
+    return taskId;
+  }
+
+  public long getTaskItemOrdinal() {
+    return taskItemOrdinal;
+  }
+
+  public void incrementItemOrdinal() {
+    taskItemOrdinal++;
+  }
+
+  public void setTaskItemOrdinal(long taskItemOrdinal) {
+    this.taskItemOrdinal = taskItemOrdinal;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskId = Varint.readUnsignedVarInt(in);
+    taskItemOrdinal = Varint.readUnsignedVarLong(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeUnsignedVarInt(taskId, out);
+    Varint.writeUnsignedVarLong(taskItemOrdinal, out);
+  }
+
+  @Override
+  public int hashCode() {
+    int prime = 31;
+    int result = 1;
+    result = prime * result + taskId;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    SplitPartitionedWritable other = (SplitPartitionedWritable) obj;
+    return taskId == other.taskId;
+  }
+
+  @Override
+  public int compareTo(SplitPartitionedWritable o) {
+    if (taskId < o.taskId) {
+      return -1;
+    }
+    if (taskId > o.taskId) {
+      return 1;
+    }
+    if (taskItemOrdinal < o.taskItemOrdinal) {
+      return -1;
+    }
+    if (taskItemOrdinal > o.taskItemOrdinal) {
+      return 1;
+    }
+    return 0;
+  }
+
+  public static final class SplitGroupingComparator extends WritableComparator implements Serializable {
+
+    public SplitGroupingComparator() {
+      super(SplitPartitionedWritable.class, true);
+    }
+
+    @Override
+    public int compare(Object a, Object b) {
+      SplitPartitionedWritable o1 = (SplitPartitionedWritable) a;
+      SplitPartitionedWritable o2 = (SplitPartitionedWritable) b;
+
+      if (o1.taskId < o2.taskId) {
+        return -1;
+      }
+      if (o1.taskId > o2.taskId) {
+        return 1;
+      }
+      return 0;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
new file mode 100644
index 0000000..a6db079
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * Computes U=Q*Uhat of SSVD (optionally adding x pow(Sigma, 0.5) )
+ * 
+ */
+public class UJob {
+  private static final String OUTPUT_U = "u";
+  private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
+  private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
+  private static final String PROP_OUTPUT_SCALING = "ssvd.u.output.scaling";
+  private static final String PROP_K = "ssvd.k";
+
+  private Job job;
+
+  public void run(Configuration conf, Path inputPathQ, Path inputUHatPath,
+      Path sigmaPath, Path outputPath, int k, int numReduceTasks,
+      Class<? extends Writable> labelClass, SSVDSolver.OutputScalingEnum outputScaling)
+    throws ClassNotFoundException, InterruptedException, IOException {
+
+    job = new Job(conf);
+    job.setJobName("U-job");
+    job.setJarByClass(UJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPathQ);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    // WARN: tight hadoop integration here:
+    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_U);
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
+
+    job.setMapperClass(UMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(labelClass);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
+    job.getConfiguration().set(PROP_SIGMA_PATH, sigmaPath.toString());
+    job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
+    job.getConfiguration().setInt(PROP_K, k);
+    job.setNumReduceTasks(0);
+    job.submit();
+
+  }
+
+  public void waitForCompletion() throws IOException, ClassNotFoundException,
+      InterruptedException {
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("U job unsuccessful.");
+    }
+
+  }
+
+  public static final class UMapper extends
+      Mapper<Writable, VectorWritable, Writable, VectorWritable> {
+
+    private Matrix uHat;
+    private DenseVector uRow;
+    private VectorWritable uRowWritable;
+    private int kp;
+    private int k;
+    private Vector sValues;
+
+    @Override
+    protected void map(Writable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      Vector qRow = value.get();
+      if (sValues != null) {
+        for (int i = 0; i < k; i++) {
+          uRow.setQuick(i,
+                        qRow.dot(uHat.viewColumn(i)) * sValues.getQuick(i));
+        }
+      } else {
+        for (int i = 0; i < k; i++) {
+          uRow.setQuick(i, qRow.dot(uHat.viewColumn(i)));
+        }
+      }
+
+      /*
+       * MAHOUT-1067: inherit A names too.
+       */
+      if (qRow instanceof NamedVector) {
+        uRowWritable.set(new NamedVector(uRow, ((NamedVector) qRow).getName()));
+      } else {
+        uRowWritable.set(uRow);
+      }
+
+      context.write(key, uRowWritable); // U inherits original A row labels.
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+      Path uHatPath = new Path(context.getConfiguration().get(PROP_UHAT_PATH));
+      Path sigmaPath = new Path(context.getConfiguration().get(PROP_SIGMA_PATH));
+      FileSystem fs = FileSystem.get(uHatPath.toUri(), context.getConfiguration());
+
+      uHat = SSVDHelper.drmLoadAsDense(fs, uHatPath, context.getConfiguration());
+      // since uHat is (k+p) x (k+p)
+      kp = uHat.columnSize();
+      k = context.getConfiguration().getInt(PROP_K, kp);
+      uRow = new DenseVector(k);
+      uRowWritable = new VectorWritable(uRow);
+
+      SSVDSolver.OutputScalingEnum outputScaling =
+        SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+                                                    .get(PROP_OUTPUT_SCALING));
+      switch (outputScaling) {
+        case SIGMA:
+          sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
+          break;
+        case HALFSIGMA:
+          sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
+          sValues.assign(Functions.SQRT);
+          break;
+        default:
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
new file mode 100644
index 0000000..daee93d
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.PlusMult;
+
+public class VJob {
+  private static final String OUTPUT_V = "v";
+  private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
+  private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
+  private static final String PROP_OUTPUT_SCALING = "ssvd.v.output.scaling";
+  private static final String PROP_K = "ssvd.k";
+  public static final String PROP_SQ_PATH = "ssvdpca.sq.path";
+  public static final String PROP_XI_PATH = "ssvdpca.xi.path";
+
+  private Job job;
+
+  public static final class VMapper extends
+      Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private Matrix uHat;
+    private Vector vRow;
+    private Vector sValues;
+    private VectorWritable vRowWritable;
+    private int kp;
+    private int k;
+    /*
+     * xi and s_q are PCA-related corrections, per MAHOUT-817
+     */
+    private Vector xi;
+    private Vector sq;
+    private final PlusMult plusMult = new PlusMult(0);
+
+    @Override
+    protected void map(IntWritable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      Vector bCol = value.get();
+      /*
+       * MAHOUT-817: PCA correction for B': b_{col=i} -= s_q * xi_{i}
+       */
+      if (xi != null) {
+        /*
+         * code defensively against shortened xi which may be externally
+         * supplied
+         */
+        int btIndex = key.get();
+        double xii = xi.size() > btIndex ? xi.getQuick(btIndex) : 0.0;
+        plusMult.setMultiplicator(-xii);
+        bCol.assign(sq, plusMult);
+      }
+
+      for (int i = 0; i < k; i++) {
+        vRow.setQuick(i, bCol.dot(uHat.viewColumn(i)) / sValues.getQuick(i));
+      }
+      context.write(key, vRowWritable);
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+      super.setup(context);
+
+      Configuration conf = context.getConfiguration();
+      FileSystem fs = FileSystem.get(conf);
+      Path uHatPath = new Path(conf.get(PROP_UHAT_PATH));
+
+      Path sigmaPath = new Path(conf.get(PROP_SIGMA_PATH));
+
+      uHat = SSVDHelper.drmLoadAsDense(fs, uHatPath, conf);
+      // since uHat is (k+p) x (k+p)
+      kp = uHat.columnSize();
+      k = context.getConfiguration().getInt(PROP_K, kp);
+      vRow = new DenseVector(k);
+      vRowWritable = new VectorWritable(vRow);
+
+      sValues = SSVDHelper.loadVector(sigmaPath, conf);
+      SSVDSolver.OutputScalingEnum outputScaling =
+        SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+                                                    .get(PROP_OUTPUT_SCALING));
+      switch (outputScaling) {
+        case SIGMA:
+          sValues.assign(1.0);
+          break;
+        case HALFSIGMA:
+          sValues = SSVDHelper.loadVector(sigmaPath, context.getConfiguration());
+          sValues.assign(Functions.SQRT);
+          break;
+        default:
+      }
+
+      /*
+       * PCA -related corrections (MAHOUT-817)
+       */
+      String xiPathStr = conf.get(PROP_XI_PATH);
+      if (xiPathStr != null) {
+        xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf);
+        sq =
+          SSVDHelper.loadAndSumUpVectors(new Path(conf.get(PROP_SQ_PATH)), conf);
+      }
+
+    }
+
+  }
+
+  /**
+   * 
+   * @param conf
+   * @param inputPathBt
+   * @param xiPath
+   *          PCA row mean (MAHOUT-817, to fix B')
+   * @param sqPath
+   *          sq (MAHOUT-817, to fix B')
+   * @param inputUHatPath
+   * @param inputSigmaPath
+   * @param outputPath
+   * @param k
+   * @param numReduceTasks
+   * @param outputScaling output scaling: apply Sigma, or Sigma^0.5, or none
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public void run(Configuration conf,
+                  Path inputPathBt,
+                  Path xiPath,
+                  Path sqPath,
+
+                  Path inputUHatPath,
+                  Path inputSigmaPath,
+
+                  Path outputPath,
+                  int k,
+                  int numReduceTasks,
+                  SSVDSolver.OutputScalingEnum outputScaling) throws ClassNotFoundException,
+    InterruptedException, IOException {
+
+    job = new Job(conf);
+    job.setJobName("V-job");
+    job.setJarByClass(VJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPathBt);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    // Warn: tight hadoop integration here:
+    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_V);
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(VMapper.class);
+
+    job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
+    job.getConfiguration().set(PROP_SIGMA_PATH, inputSigmaPath.toString());
+    job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
+    job.getConfiguration().setInt(PROP_K, k);
+    job.setNumReduceTasks(0);
+
+    /*
+     * PCA-related options, MAHOUT-817
+     */
+    if (xiPath != null) {
+      job.getConfiguration().set(PROP_XI_PATH, xiPath.toString());
+      job.getConfiguration().set(PROP_SQ_PATH, sqPath.toString());
+    }
+
+    job.submit();
+
+  }
+
+  public void waitForCompletion() throws IOException, ClassNotFoundException,
+    InterruptedException {
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("V job unsuccessful.");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/YtYJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/YtYJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/YtYJob.java
new file mode 100644
index 0000000..378a885
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/YtYJob.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.UpperTriangular;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+/**
+ * Job that accumulates Y'Y output
+ */
+public final class YtYJob {
+
+  public static final String PROP_OMEGA_SEED = "ssvd.omegaseed";
+  public static final String PROP_K = "ssvd.k";
+  public static final String PROP_P = "ssvd.p";
+
+  // we have single output, so we use standard output
+  public static final String OUTPUT_YT_Y = "part-";
+
+  private YtYJob() {
+  }
+
+  public static class YtYMapper extends
+    Mapper<Writable, VectorWritable, IntWritable, VectorWritable> {
+
+    private int kp;
+    private Omega omega;
+    private UpperTriangular mYtY;
+
+    /*
+     * we keep yRow in a dense form here but keep an eye not to dense up while
+     * doing YtY products. I am not sure that sparse vector would create much
+     * performance benefits since we must to assume that y would be more often
+     * dense than sparse, so for bulk dense operations that would perform
+     * somewhat better than a RandomAccessSparse vector frequent updates.
+     */
+    private Vector yRow;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+      int k = context.getConfiguration().getInt(PROP_K, -1);
+      int p = context.getConfiguration().getInt(PROP_P, -1);
+
+      Validate.isTrue(k > 0, "invalid k parameter");
+      Validate.isTrue(p > 0, "invalid p parameter");
+
+      kp = k + p;
+      long omegaSeed =
+        Long.parseLong(context.getConfiguration().get(PROP_OMEGA_SEED));
+
+      omega = new Omega(omegaSeed, k + p);
+
+      mYtY = new UpperTriangular(kp);
+
+      // see which one works better!
+      // yRow = new RandomAccessSparseVector(kp);
+      yRow = new DenseVector(kp);
+    }
+
+    @Override
+    protected void map(Writable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      omega.computeYRow(value.get(), yRow);
+      // compute outer product update for YtY
+
+      if (yRow.isDense()) {
+        for (int i = 0; i < kp; i++) {
+          double yi;
+          if ((yi = yRow.getQuick(i)) == 0.0) {
+            continue; // avoid densing up here unnecessarily
+          }
+          for (int j = i; j < kp; j++) {
+            double yj;
+            if ((yj = yRow.getQuick(j)) != 0.0) {
+              mYtY.setQuick(i, j, mYtY.getQuick(i, j) + yi * yj);
+            }
+          }
+        }
+      } else {
+        /*
+         * the disadvantage of using sparse vector (aside from the fact that we
+         * are creating some short-lived references) here is that we obviously
+         * do two times more iterations then necessary if y row is pretty dense.
+         */
+        for (Vector.Element eli : yRow.nonZeroes()) {
+          int i = eli.index();
+          for (Vector.Element elj : yRow.nonZeroes()) {
+            int j = elj.index();
+            if (j < i) {
+              continue;
+            }
+            mYtY.setQuick(i, j, mYtY.getQuick(i, j) + eli.get() * elj.get());
+          }
+        }
+      }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+      context.write(new IntWritable(context.getTaskAttemptID().getTaskID()
+                                      .getId()),
+                    new VectorWritable(new DenseVector(mYtY.getData())));
+    }
+  }
+
+  public static class YtYReducer extends
+    Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+    private final VectorWritable accum = new VectorWritable();
+    private DenseVector acc;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+      int k = context.getConfiguration().getInt(PROP_K, -1);
+      int p = context.getConfiguration().getInt(PROP_P, -1);
+
+      Validate.isTrue(k > 0, "invalid k parameter");
+      Validate.isTrue(p > 0, "invalid p parameter");
+      accum.set(acc = new DenseVector(k + p));
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+      context.write(new IntWritable(), accum);
+    }
+
+    @Override
+    protected void reduce(IntWritable key,
+                          Iterable<VectorWritable> values,
+                          Context arg2) throws IOException,
+      InterruptedException {
+      for (VectorWritable vw : values) {
+        acc.addAll(vw.get());
+      }
+    }
+  }
+
+  public static void run(Configuration conf,
+                         Path[] inputPaths,
+                         Path outputPath,
+                         int k,
+                         int p,
+                         long seed) throws ClassNotFoundException,
+    InterruptedException, IOException {
+
+    Job job = new Job(conf);
+    job.setJobName("YtY-job");
+    job.setJarByClass(YtYJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPaths);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(YtYMapper.class);
+
+    job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
+    job.getConfiguration().setInt(PROP_K, k);
+    job.getConfiguration().setInt(PROP_P, p);
+
+    /*
+     * we must reduce to just one matrix which means we need only one reducer.
+     * But it's ok since each mapper outputs only one vector (a packed
+     * UpperTriangular) so even if there're thousands of mappers, one reducer
+     * should cope just fine.
+     */
+    job.setNumReduceTasks(1);
+
+    job.submit();
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("YtY job unsuccessful.");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GivensThinSolver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GivensThinSolver.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GivensThinSolver.java
new file mode 100644
index 0000000..af79cb4
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GivensThinSolver.java
@@ -0,0 +1,643 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd.qr;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.math.AbstractVector;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.OrderedIntDoubleMapping;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.UpperTriangular;
+
+/**
+ * Givens Thin solver. Standard Givens operations are reordered in a way that
+ * helps us to push them thru MapReduce operations in a block fashion.
+ */
+public class GivensThinSolver {
+
+  private double[] vARow;
+  private double[] vQtRow;
+  private final double[][] mQt;
+  private final double[][] mR;
+  private int qtStartRow;
+  private int rStartRow;
+  private int m;
+  private final int n; // m-row cnt, n- column count, m>=n
+  private int cnt;
+  private final double[] cs = new double[2];
+
+  public GivensThinSolver(int m, int n) {
+    if (!(m >= n)) {
+      throw new IllegalArgumentException("Givens thin QR: must be true: m>=n");
+    }
+
+    this.m = m;
+    this.n = n;
+
+    mQt = new double[n][];
+    mR = new double[n][];
+    vARow = new double[n];
+    vQtRow = new double[m];
+
+    for (int i = 0; i < n; i++) {
+      mQt[i] = new double[this.m];
+      mR[i] = new double[this.n];
+    }
+    cnt = 0;
+  }
+
+  public void reset() {
+    cnt = 0;
+  }
+
+  public void solve(Matrix a) {
+
+    assert a.rowSize() == m;
+    assert a.columnSize() == n;
+
+    double[] aRow = new double[n];
+    for (int i = 0; i < m; i++) {
+      Vector aRowV = a.viewRow(i);
+      for (int j = 0; j < n; j++) {
+        aRow[j] = aRowV.getQuick(j);
+      }
+      appendRow(aRow);
+    }
+  }
+
+  public boolean isFull() {
+    return cnt == m;
+  }
+
+  public int getM() {
+    return m;
+  }
+
+  public int getN() {
+    return n;
+  }
+
+  public int getCnt() {
+    return cnt;
+  }
+
+  public void adjust(int newM) {
+    if (newM == m) {
+      // no adjustment is required.
+      return; 
+    }
+    if (newM < n) {
+      throw new IllegalArgumentException("new m can't be less than n");
+    }
+    if (newM < cnt) {
+      throw new IllegalArgumentException(
+          "new m can't be less than rows accumulated");
+    }
+    vQtRow = new double[newM];
+
+    // grow or shrink qt rows
+    if (newM > m) {
+      // grow qt rows
+      for (int i = 0; i < n; i++) {
+        mQt[i] = Arrays.copyOf(mQt[i], newM);
+        System.arraycopy(mQt[i], 0, mQt[i], newM - m, m);
+        Arrays.fill(mQt[i], 0, newM - m, 0);
+      }
+    } else {
+      // shrink qt rows
+      for (int i = 0; i < n; i++) {
+        mQt[i] = Arrays.copyOfRange(mQt[i], m - newM, m);
+      }
+    }
+
+    m = newM;
+
+  }
+
+  public void trim() {
+    adjust(cnt);
+  }
+
+  /**
+   * api for row-by-row addition
+   * 
+   * @param aRow
+   */
+  public void appendRow(double[] aRow) {
+    if (cnt >= m) {
+      throw new IllegalStateException("thin QR solver fed more rows than initialized for");
+    }
+    try {
+      /*
+       * moving pointers around is inefficient but for the sanity's sake i am
+       * keeping it this way so i don't have to guess how R-tilde index maps to
+       * actual block index
+       */
+      Arrays.fill(vQtRow, 0);
+      vQtRow[m - cnt - 1] = 1;
+      int height = cnt > n ? n : cnt;
+      System.arraycopy(aRow, 0, vARow, 0, n);
+
+      if (height > 0) {
+        givens(vARow[0], getRRow(0)[0], cs);
+        applyGivensInPlace(cs[0], cs[1], vARow, getRRow(0), 0, n);
+        applyGivensInPlace(cs[0], cs[1], vQtRow, getQtRow(0), 0, m);
+      }
+
+      for (int i = 1; i < height; i++) {
+        givens(getRRow(i - 1)[i], getRRow(i)[i], cs);
+        applyGivensInPlace(cs[0], cs[1], getRRow(i - 1), getRRow(i), i,
+            n - i);
+        applyGivensInPlace(cs[0], cs[1], getQtRow(i - 1), getQtRow(i), 0,
+            m);
+      }
+      /*
+       * push qt and r-tilde 1 row down
+       * 
+       * just swap the references to reduce GC churning
+       */
+      pushQtDown();
+      double[] swap = getQtRow(0);
+      setQtRow(0, vQtRow);
+      vQtRow = swap;
+
+      pushRDown();
+      swap = getRRow(0);
+      setRRow(0, vARow);
+      vARow = swap;
+
+    } finally {
+      cnt++;
+    }
+  }
+
+  private double[] getQtRow(int row) {
+
+    return mQt[(row += qtStartRow) >= n ? row - n : row];
+  }
+
+  private void setQtRow(int row, double[] qtRow) {
+    mQt[(row += qtStartRow) >= n ? row - n : row] = qtRow;
+  }
+
+  private void pushQtDown() {
+    qtStartRow = qtStartRow == 0 ? n - 1 : qtStartRow - 1;
+  }
+
+  private double[] getRRow(int row) {
+    row += rStartRow;
+    return mR[row >= n ? row - n : row];
+  }
+
+  private void setRRow(int row, double[] rrow) {
+    mR[(row += rStartRow) >= n ? row - n : row] = rrow;
+  }
+
+  private void pushRDown() {
+    rStartRow = rStartRow == 0 ? n - 1 : rStartRow - 1;
+  }
+
+  /*
+   * warning: both of these return actually n+1 rows with the last one being //
+   * not interesting.
+   */
+  public UpperTriangular getRTilde() {
+    UpperTriangular packedR = new UpperTriangular(n);
+    for (int i = 0; i < n; i++) {
+      packedR.assignNonZeroElementsInRow(i, getRRow(i));
+    }
+    return packedR;
+  }
+
+  public double[][] getThinQtTilde() {
+    if (qtStartRow != 0) {
+      /*
+       * rotate qt rows into place
+       * 
+       * double[~500][], once per block, not a big deal.
+       */
+      double[][] qt = new double[n][]; 
+      System.arraycopy(mQt, qtStartRow, qt, 0, n - qtStartRow);
+      System.arraycopy(mQt, 0, qt, n - qtStartRow, qtStartRow);
+      return qt;
+    }
+    return mQt;
+  }
+
+  public static void applyGivensInPlace(double c, double s, double[] row1,
+      double[] row2, int offset, int len) {
+
+    int n = offset + len;
+    for (int j = offset; j < n; j++) {
+      double tau1 = row1[j];
+      double tau2 = row2[j];
+      row1[j] = c * tau1 - s * tau2;
+      row2[j] = s * tau1 + c * tau2;
+    }
+  }
+
+  public static void applyGivensInPlace(double c, double s, Vector row1,
+      Vector row2, int offset, int len) {
+
+    int n = offset + len;
+    for (int j = offset; j < n; j++) {
+      double tau1 = row1.getQuick(j);
+      double tau2 = row2.getQuick(j);
+      row1.setQuick(j, c * tau1 - s * tau2);
+      row2.setQuick(j, s * tau1 + c * tau2);
+    }
+  }
+
+  public static void applyGivensInPlace(double c, double s, int i, int k,
+      Matrix mx) {
+    int n = mx.columnSize();
+
+    for (int j = 0; j < n; j++) {
+      double tau1 = mx.get(i, j);
+      double tau2 = mx.get(k, j);
+      mx.set(i, j, c * tau1 - s * tau2);
+      mx.set(k, j, s * tau1 + c * tau2);
+    }
+  }
+
+  public static void fromRho(double rho, double[] csOut) {
+    if (rho == 1) {
+      csOut[0] = 0;
+      csOut[1] = 1;
+      return;
+    }
+    if (Math.abs(rho) < 1) {
+      csOut[1] = 2 * rho;
+      csOut[0] = Math.sqrt(1 - csOut[1] * csOut[1]);
+      return;
+    }
+    csOut[0] = 2 / rho;
+    csOut[1] = Math.sqrt(1 - csOut[0] * csOut[0]);
+  }
+
+  public static void givens(double a, double b, double[] csOut) {
+    if (b == 0) {
+      csOut[0] = 1;
+      csOut[1] = 0;
+      return;
+    }
+    if (Math.abs(b) > Math.abs(a)) {
+      double tau = -a / b;
+      csOut[1] = 1 / Math.sqrt(1 + tau * tau);
+      csOut[0] = csOut[1] * tau;
+    } else {
+      double tau = -b / a;
+      csOut[0] = 1 / Math.sqrt(1 + tau * tau);
+      csOut[1] = csOut[0] * tau;
+    }
+  }
+
+  public static double toRho(double c, double s) {
+    if (c == 0) {
+      return 1;
+    }
+    if (Math.abs(s) < Math.abs(c)) {
+      return Math.signum(c) * s / 2;
+    } else {
+      return Math.signum(s) * 2 / c;
+    }
+  }
+
+  public static void mergeR(UpperTriangular r1, UpperTriangular r2) {
+    TriangularRowView r1Row = new TriangularRowView(r1);
+    TriangularRowView r2Row = new TriangularRowView(r2);
+    
+    int kp = r1Row.size();
+    assert kp == r2Row.size();
+
+    double[] cs = new double[2];
+
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1Row.setViewedRow(u).get(u), r2Row.setViewedRow(u - v).get(u),
+            cs);
+        applyGivensInPlace(cs[0], cs[1], r1Row, r2Row, u, kp - u);
+      }
+    }
+  }
+
+  public static void mergeR(double[][] r1, double[][] r2) {
+    int kp = r1[0].length;
+    assert kp == r2[0].length;
+
+    double[] cs = new double[2];
+
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1[u][u], r2[u - v][u], cs);
+        applyGivensInPlace(cs[0], cs[1], r1[u], r2[u - v], u, kp - u);
+      }
+    }
+
+  }
+
+  public static void mergeRonQ(UpperTriangular r1, UpperTriangular r2,
+      double[][] qt1, double[][] qt2) {
+    TriangularRowView r1Row = new TriangularRowView(r1);
+    TriangularRowView r2Row = new TriangularRowView(r2);
+    int kp = r1Row.size();
+    assert kp == r2Row.size();
+    assert kp == qt1.length;
+    assert kp == qt2.length;
+
+    int r = qt1[0].length;
+    assert qt2[0].length == r;
+
+    double[] cs = new double[2];
+
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1Row.setViewedRow(u).get(u), r2Row.setViewedRow(u - v).get(u),
+            cs);
+        applyGivensInPlace(cs[0], cs[1], r1Row, r2Row, u, kp - u);
+        applyGivensInPlace(cs[0], cs[1], qt1[u], qt2[u - v], 0, r);
+      }
+    }
+  }
+
+  public static void mergeRonQ(double[][] r1, double[][] r2, double[][] qt1,
+      double[][] qt2) {
+
+    int kp = r1[0].length;
+    assert kp == r2[0].length;
+    assert kp == qt1.length;
+    assert kp == qt2.length;
+
+    int r = qt1[0].length;
+    assert qt2[0].length == r;
+    double[] cs = new double[2];
+
+    /*
+     * pairwise givens(a,b) so that a come off main diagonal in r1 and bs come
+     * off u-th upper subdiagonal in r2.
+     */
+    for (int v = 0; v < kp; v++) {
+      for (int u = v; u < kp; u++) {
+        givens(r1[u][u], r2[u - v][u], cs);
+        applyGivensInPlace(cs[0], cs[1], r1[u], r2[u - v], u, kp - u);
+        applyGivensInPlace(cs[0], cs[1], qt1[u], qt2[u - v], 0, r);
+      }
+    }
+  }
+
+  // returns merged Q (which in this case is the qt1)
+  public static double[][] mergeQrUp(double[][] qt1, double[][] r1,
+      double[][] r2) {
+    int kp = qt1.length;
+    int r = qt1[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++) {
+      qTilde[i] = new double[r];
+    }
+    mergeRonQ(r1, r2, qt1, qTilde);
+    return qt1;
+  }
+
+  // returns merged Q (which in this case is the qt1)
+  public static double[][] mergeQrUp(double[][] qt1, UpperTriangular r1, UpperTriangular r2) {
+    int kp = qt1.length;
+    int r = qt1[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++) {
+      qTilde[i] = new double[r];
+    }
+    mergeRonQ(r1, r2, qt1, qTilde);
+    return qt1;
+  }
+
+  public static double[][] mergeQrDown(double[][] r1, double[][] qt2, double[][] r2) {
+    int kp = qt2.length;
+    int r = qt2[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++) {
+      qTilde[i] = new double[r];
+    }
+    mergeRonQ(r1, r2, qTilde, qt2);
+    return qTilde;
+
+  }
+
+  public static double[][] mergeQrDown(UpperTriangular r1, double[][] qt2, UpperTriangular r2) {
+    int kp = qt2.length;
+    int r = qt2[0].length;
+
+    double[][] qTilde = new double[kp][];
+    for (int i = 0; i < kp; i++) {
+      qTilde[i] = new double[r];
+    }
+    mergeRonQ(r1, r2, qTilde, qt2);
+    return qTilde;
+
+  }
+
+  public static double[][] computeQtHat(double[][] qt, int i,
+      Iterator<UpperTriangular> rIter) {
+    UpperTriangular rTilde = rIter.next();
+    for (int j = 1; j < i; j++) {
+      mergeR(rTilde, rIter.next());
+    }
+    if (i > 0) {
+      qt = mergeQrDown(rTilde, qt, rIter.next());
+    }
+    while (rIter.hasNext()) {
+      qt = mergeQrUp(qt, rTilde, rIter.next());
+    }
+    return qt;
+  }
+
+  // test helpers
+  public static boolean isOrthonormal(double[][] qt, boolean insufficientRank, double epsilon) {
+    int n = qt.length;
+    int rank = 0;
+    for (int i = 0; i < n; i++) {
+      Vector ei = new DenseVector(qt[i], true);
+
+      double norm = ei.norm(2);
+
+      if (Math.abs(1.0 - norm) < epsilon) {
+        rank++;
+      } else if (Math.abs(norm) > epsilon) {
+        return false; // not a rank deficiency, either
+      }
+
+      for (int j = 0; j <= i; j++) {
+        Vector ej = new DenseVector(qt[j], true);
+        double dot = ei.dot(ej);
+        if (!(Math.abs((i == j && rank > j ? 1.0 : 0.0) - dot) < epsilon)) {
+          return false;
+        }
+      }
+    }
+    return insufficientRank ? rank < n : rank == n;
+  }
+
+  public static boolean isOrthonormalBlocked(Iterable<double[][]> qtHats,
+      boolean insufficientRank, double epsilon) {
+    int n = qtHats.iterator().next().length;
+    int rank = 0;
+    for (int i = 0; i < n; i++) {
+      List<Vector> ei = Lists.newArrayList();
+      // Vector e_i=new DenseVector (qt[i],true);
+      for (double[][] qtHat : qtHats) {
+        ei.add(new DenseVector(qtHat[i], true));
+      }
+
+      double norm = 0;
+      for (Vector v : ei) {
+        norm += v.dot(v);
+      }
+      norm = Math.sqrt(norm);
+      if (Math.abs(1 - norm) < epsilon) {
+        rank++;
+      } else if (Math.abs(norm) > epsilon) {
+        return false; // not a rank deficiency, either
+      }
+
+      for (int j = 0; j <= i; j++) {
+        List<Vector> ej = Lists.newArrayList();
+        for (double[][] qtHat : qtHats) {
+          ej.add(new DenseVector(qtHat[j], true));
+        }
+
+        // Vector e_j = new DenseVector ( qt[j], true);
+        double dot = 0;
+        for (int k = 0; k < ei.size(); k++) {
+          dot += ei.get(k).dot(ej.get(k));
+        }
+        if (!(Math.abs((i == j && rank > j ? 1 : 0) - dot) < epsilon)) {
+          return false;
+        }
+      }
+    }
+    return insufficientRank ? rank < n : rank == n;
+  }
+
+  private static final class TriangularRowView extends AbstractVector {
+    private final UpperTriangular viewed;
+    private int rowNum;
+
+    private TriangularRowView(UpperTriangular viewed) {
+      super(viewed.columnSize());
+      this.viewed = viewed;
+
+    }
+
+    TriangularRowView setViewedRow(int row) {
+      rowNum = row;
+      return this;
+    }
+
+    @Override
+    public boolean isDense() {
+      return true;
+    }
+
+    @Override
+    public boolean isSequentialAccess() {
+      return false;
+    }
+
+    @Override
+    public Iterator<Element> iterator() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Iterator<Element> iterateNonZero() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getQuick(int index) {
+      return viewed.getQuick(rowNum, index);
+    }
+
+    @Override
+    public Vector like() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Vector like(int cardinality) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setQuick(int index, double value) {
+      viewed.setQuick(rowNum, index, value);
+
+    }
+
+    @Override
+    public int getNumNondefaultElements() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getLookupCost() {
+      return 1;
+    }
+
+    @Override
+    public double getIteratorAdvanceCost() {
+      return 1;
+    }
+
+    @Override
+    public boolean isAddConstantTime() {
+      return true;
+    }
+
+    @Override
+    public Matrix matrixLike(int rows, int columns) {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Used internally by assign() to update multiple indices and values at once.
+     * Only really useful for sparse vectors (especially SequentialAccessSparseVector).
+     * <p/>
+     * If someone ever adds a new type of sparse vectors, this method must merge (index, value) pairs into the vector.
+     *
+     * @param updates a mapping of indices to values to merge in the vector.
+     */
+    @Override
+    public void mergeUpdates(OrderedIntDoubleMapping updates) {
+      int[] indices = updates.getIndices();
+      double[] values = updates.getValues();
+      for (int i = 0; i < updates.getNumMappings(); ++i) {
+        viewed.setQuick(rowNum, indices[i], values[i]);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GramSchmidt.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GramSchmidt.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GramSchmidt.java
new file mode 100644
index 0000000..09be91f
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/GramSchmidt.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd.qr;
+
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.DoubleFunction;
+
+/**
+ * Gram Schmidt quick helper.
+ */
+public final class GramSchmidt {
+
+  private GramSchmidt() {
+  }
+
+  public static void orthonormalizeColumns(Matrix mx) {
+
+    int n = mx.numCols();
+
+    for (int c = 0; c < n; c++) {
+      Vector col = mx.viewColumn(c);
+      for (int c1 = 0; c1 < c; c1++) {
+        Vector viewC1 = mx.viewColumn(c1);
+        col.assign(col.minus(viewC1.times(viewC1.dot(col))));
+
+      }
+      final double norm2 = col.norm(2);
+      col.assign(new DoubleFunction() {
+        @Override
+        public double apply(double x) {
+          return x / norm2;
+        }
+      });
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java
new file mode 100644
index 0000000..8509e0a
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRFirstStep.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd.qr;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.iterator.CopyConstructorIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.stochasticsvd.DenseBlockWritable;
+import org.apache.mahout.math.UpperTriangular;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+/**
+ * QR first step without MR abstractions and doing it just in terms of iterators
+ * and collectors. (although Collector is probably an outdated api).
+ * 
+ * 
+ */
+@SuppressWarnings("deprecation")
+public class QRFirstStep implements Closeable, OutputCollector<Writable, Vector> {
+
+  public static final String PROP_K = "ssvd.k";
+  public static final String PROP_P = "ssvd.p";
+  public static final String PROP_AROWBLOCK_SIZE = "ssvd.arowblock.size";
+
+  private int kp;
+  private List<double[]> yLookahead;
+  private GivensThinSolver qSolver;
+  private int blockCnt;
+  private final DenseBlockWritable value = new DenseBlockWritable();
+  private final Writable tempKey = new IntWritable();
+  private MultipleOutputs outputs;
+  private final Deque<Closeable> closeables = Lists.newLinkedList();
+  private SequenceFile.Writer tempQw;
+  private Path tempQPath;
+  private final List<UpperTriangular> rSubseq = Lists.newArrayList();
+  private final Configuration jobConf;
+
+  private final OutputCollector<? super Writable, ? super DenseBlockWritable> qtHatOut;
+  private final OutputCollector<? super Writable, ? super VectorWritable> rHatOut;
+
+  public QRFirstStep(Configuration jobConf,
+                     OutputCollector<? super Writable, ? super DenseBlockWritable> qtHatOut,
+                     OutputCollector<? super Writable, ? super VectorWritable> rHatOut) {
+    this.jobConf = jobConf;
+    this.qtHatOut = qtHatOut;
+    this.rHatOut = rHatOut;
+    setup();
+  }
+
+  @Override
+  public void close() throws IOException {
+    cleanup();
+  }
+
+  public int getKP() {
+    return kp;
+  }
+
+  private void flushSolver() throws IOException {
+    UpperTriangular r = qSolver.getRTilde();
+    double[][] qt = qSolver.getThinQtTilde();
+
+    rSubseq.add(r);
+
+    value.setBlock(qt);
+    getTempQw().append(tempKey, value);
+
+    /*
+     * this probably should be a sparse row matrix, but compressor should get it
+     * for disk and in memory we want it dense anyway, sparse random
+     * implementations would be a mostly a memory management disaster consisting
+     * of rehashes and GC // thrashing. (IMHO)
+     */
+    value.setBlock(null);
+    qSolver.reset();
+  }
+
+  // second pass to run a modified version of computeQHatSequence.
+  private void flushQBlocks() throws IOException {
+    if (blockCnt == 1) {
+      /*
+       * only one block, no temp file, no second pass. should be the default
+       * mode for efficiency in most cases. Sure mapper should be able to load
+       * the entire split in memory -- and we don't require even that.
+       */
+      value.setBlock(qSolver.getThinQtTilde());
+      outputQHat(value);
+      outputR(new VectorWritable(new DenseVector(qSolver.getRTilde().getData(),
+                                                 true)));
+
+    } else {
+      secondPass();
+    }
+  }
+
+  private void outputQHat(DenseBlockWritable value) throws IOException {
+    qtHatOut.collect(NullWritable.get(), value);
+  }
+
+  private void outputR(VectorWritable value) throws IOException {
+    rHatOut.collect(NullWritable.get(), value);
+  }
+
+  private void secondPass() throws IOException {
+    qSolver = null; // release mem
+    FileSystem localFs = FileSystem.getLocal(jobConf);
+    SequenceFile.Reader tempQr =
+      new SequenceFile.Reader(localFs, tempQPath, jobConf);
+    closeables.addFirst(tempQr);
+    int qCnt = 0;
+    while (tempQr.next(tempKey, value)) {
+      value
+        .setBlock(GivensThinSolver.computeQtHat(value.getBlock(),
+                                                qCnt,
+                                                new CopyConstructorIterator<>(rSubseq.iterator())));
+      if (qCnt == 1) {
+        /*
+         * just merge r[0] <- r[1] so it doesn't have to repeat in subsequent
+         * computeQHat iterators
+         */
+        GivensThinSolver.mergeR(rSubseq.get(0), rSubseq.remove(1));
+      } else {
+        qCnt++;
+      }
+      outputQHat(value);
+    }
+
+    assert rSubseq.size() == 1;
+
+    outputR(new VectorWritable(new DenseVector(rSubseq.get(0).getData(), true)));
+
+  }
+
+  protected void map(Vector incomingYRow) throws IOException {
+    double[] yRow;
+    if (yLookahead.size() == kp) {
+      if (qSolver.isFull()) {
+
+        flushSolver();
+        blockCnt++;
+
+      }
+      yRow = yLookahead.remove(0);
+
+      qSolver.appendRow(yRow);
+    } else {
+      yRow = new double[kp];
+    }
+
+    if (incomingYRow.isDense()) {
+      for (int i = 0; i < kp; i++) {
+        yRow[i] = incomingYRow.get(i);
+      }
+    } else {
+      Arrays.fill(yRow, 0);
+      for (Element yEl : incomingYRow.nonZeroes()) {
+        yRow[yEl.index()] = yEl.get();
+      }
+    }
+
+    yLookahead.add(yRow);
+  }
+
+  protected void setup() {
+
+    int r = Integer.parseInt(jobConf.get(PROP_AROWBLOCK_SIZE));
+    int k = Integer.parseInt(jobConf.get(PROP_K));
+    int p = Integer.parseInt(jobConf.get(PROP_P));
+    kp = k + p;
+
+    yLookahead = Lists.newArrayListWithCapacity(kp);
+    qSolver = new GivensThinSolver(r, kp);
+    outputs = new MultipleOutputs(new JobConf(jobConf));
+    closeables.addFirst(new Closeable() {
+      @Override
+      public void close() throws IOException {
+        outputs.close();
+      }
+    });
+
+  }
+
+  protected void cleanup() throws IOException {
+    try {
+      if (qSolver == null && yLookahead.isEmpty()) {
+        return;
+      }
+      if (qSolver == null) {
+        qSolver = new GivensThinSolver(yLookahead.size(), kp);
+      }
+      // grow q solver up if necessary
+
+      qSolver.adjust(qSolver.getCnt() + yLookahead.size());
+      while (!yLookahead.isEmpty()) {
+
+        qSolver.appendRow(yLookahead.remove(0));
+
+      }
+      assert qSolver.isFull();
+      if (++blockCnt > 1) {
+        flushSolver();
+        assert tempQw != null;
+        closeables.remove(tempQw);
+        Closeables.close(tempQw, false);
+      }
+      flushQBlocks();
+
+    } finally {
+      IOUtils.close(closeables);
+    }
+
+  }
+
+  private SequenceFile.Writer getTempQw() throws IOException {
+    if (tempQw == null) {
+      /*
+       * temporary Q output hopefully will not exceed size of IO cache in which
+       * case it is only good since it is going to be managed by kernel, not
+       * java GC. And if IO cache is not good enough, then at least it is always
+       * sequential.
+       */
+      String taskTmpDir = System.getProperty("java.io.tmpdir");
+
+      FileSystem localFs = FileSystem.getLocal(jobConf);
+      Path parent = new Path(taskTmpDir);
+      Path sub = new Path(parent, "qw_" + System.currentTimeMillis());
+      tempQPath = new Path(sub, "q-temp.seq");
+      tempQw =
+        SequenceFile.createWriter(localFs,
+                                  jobConf,
+                                  tempQPath,
+                                  IntWritable.class,
+                                  DenseBlockWritable.class,
+                                  CompressionType.BLOCK);
+      closeables.addFirst(tempQw);
+      closeables.addFirst(new IOUtils.DeleteFileOnClose(new File(tempQPath
+        .toString())));
+    }
+    return tempQw;
+  }
+
+  @Override
+  public void collect(Writable key, Vector vw) throws IOException {
+    map(vw);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRLastStep.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRLastStep.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRLastStep.java
new file mode 100644
index 0000000..545f1f9
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/qr/QRLastStep.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd.qr;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.mahout.common.iterator.CopyConstructorIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.stochasticsvd.DenseBlockWritable;
+import org.apache.mahout.math.UpperTriangular;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Second/last step of QR iterations. Takes input of qtHats and rHats and
+ * provides iterator to pull ready rows of final Q.
+ * 
+ */
+public class QRLastStep implements Closeable, Iterator<Vector> {
+
+  private final Iterator<DenseBlockWritable> qHatInput;
+
+  private final List<UpperTriangular> mRs = Lists.newArrayList();
+  private final int blockNum;
+  private double[][] mQt;
+  private int cnt;
+  private int r;
+  private int kp;
+  private Vector qRow;
+
+  /**
+   * 
+   * @param qHatInput
+   *          the Q-Hat input that was output in the first step
+   * @param rHatInput
+   *          all RHat outputs int the group in order of groups
+   * @param blockNum
+   *          our RHat number in the group
+   */
+  public QRLastStep(Iterator<DenseBlockWritable> qHatInput,
+                    Iterator<VectorWritable> rHatInput,
+                    int blockNum) {
+    this.blockNum = blockNum;
+    this.qHatInput = qHatInput;
+    /*
+     * in this implementation we actually preload all Rs into memory to make R
+     * sequence modifications more efficient.
+     */
+    int block = 0;
+    while (rHatInput.hasNext()) {
+      Vector value = rHatInput.next().get();
+      if (block < blockNum && block > 0) {
+        GivensThinSolver.mergeR(mRs.get(0), new UpperTriangular(value));
+      } else {
+        mRs.add(new UpperTriangular(value));
+      }
+      block++;
+    }
+
+  }
+
+  private boolean loadNextQt() {
+    boolean more = qHatInput.hasNext();
+    if (!more) {
+      return false;
+    }
+    DenseBlockWritable v = qHatInput.next();
+    mQt =
+      GivensThinSolver
+        .computeQtHat(v.getBlock(),
+                      blockNum == 0 ? 0 : 1,
+                      new CopyConstructorIterator<>(mRs.iterator()));
+    r = mQt[0].length;
+    kp = mQt.length;
+    if (qRow == null) {
+      qRow = new DenseVector(kp);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (mQt != null && cnt == r) {
+      mQt = null;
+    }
+    boolean result = true;
+    if (mQt == null) {
+      result = loadNextQt();
+      cnt = 0;
+    }
+    return result;
+  }
+
+  @Override
+  public Vector next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    Validate.isTrue(hasNext(), "Q input overrun");
+    /*
+     * because Q blocks are initially stored in inverse order
+     */
+    int qRowIndex = r - cnt - 1; 
+    for (int j = 0; j < kp; j++) {
+      qRow.setQuick(j, mQt[j][qRowIndex]);
+    }
+    cnt++;
+    return qRow;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {
+    mQt = null;
+    mRs.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/neighborhood/BruteSearch.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/neighborhood/BruteSearch.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/neighborhood/BruteSearch.java
new file mode 100644
index 0000000..51484c7
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/neighborhood/BruteSearch.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.neighborhood;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.WeightedVector;
+import org.apache.mahout.math.random.WeightedThing;
+
+/**
+ * Search for nearest neighbors using a complete search (i.e. looping through
+ * the references and comparing each vector to the query).
+ */
+public class BruteSearch extends UpdatableSearcher {
+  /**
+   * The list of reference vectors.
+   */
+  private final List<Vector> referenceVectors;
+
+  public BruteSearch(DistanceMeasure distanceMeasure) {
+    super(distanceMeasure);
+    referenceVectors = Lists.newArrayList();
+  }
+
+  @Override
+  public void add(Vector vector) {
+    referenceVectors.add(vector);
+  }
+
+  @Override
+  public int size() {
+    return referenceVectors.size();
+  }
+
+  /**
+   * Scans the list of reference vectors one at a time for @limit neighbors of
+   * the query vector.
+   * The weights of the WeightedVectors are not taken into account.
+   *
+   * @param query     The query vector.
+   * @param limit The number of results to returned; must be at least 1.
+   * @return A list of the closest @limit neighbors for the given query.
+   */
+  @Override
+  public List<WeightedThing<Vector>> search(Vector query, int limit) {
+    Preconditions.checkArgument(limit > 0, "limit must be greater then 0!");
+    limit = Math.min(limit, referenceVectors.size());
+    // A priority queue of the best @limit elements, ordered from worst to best so that the worst
+    // element is always on top and can easily be removed.
+    PriorityQueue<WeightedThing<Integer>> bestNeighbors =
+        new PriorityQueue<>(limit, Ordering.natural().reverse());
+    // The resulting list of weighted WeightedVectors (the weight is the distance from the query).
+    List<WeightedThing<Vector>> results =
+        Lists.newArrayListWithCapacity(limit);
+    int rowNumber = 0;
+    for (Vector row : referenceVectors) {
+      double distance = distanceMeasure.distance(query, row);
+      // Only add a new neighbor if the result is better than the worst element
+      // in the queue or the queue isn't full.
+      if (bestNeighbors.size() < limit || bestNeighbors.peek().getWeight() > distance) {
+        bestNeighbors.add(new WeightedThing<>(rowNumber, distance));
+        if (bestNeighbors.size() > limit) {
+          bestNeighbors.poll();
+        } else {
+          // Increase the size of the results list by 1 so we can add elements in the reverse
+          // order from the queue.
+          results.add(null);
+        }
+      }
+      ++rowNumber;
+    }
+    for (int i = limit - 1; i >= 0; --i) {
+      WeightedThing<Integer> neighbor = bestNeighbors.poll();
+      results.set(i, new WeightedThing<>(
+          referenceVectors.get(neighbor.getValue()), neighbor.getWeight()));
+    }
+    return results;
+  }
+
+  /**
+   * Returns the closest vector to the query.
+   * When only one the nearest vector is needed, use this method, NOT search(query, limit) because
+   * it's faster (less overhead).
+   *
+   * @param query the vector to search for
+   * @param differentThanQuery if true, returns the closest vector different than the query (this
+   *                           only matters if the query is among the searched vectors), otherwise,
+   *                           returns the closest vector to the query (even the same vector).
+   * @return the weighted vector closest to the query
+   */
+  @Override
+  public WeightedThing<Vector> searchFirst(Vector query, boolean differentThanQuery) {
+    double bestDistance = Double.POSITIVE_INFINITY;
+    Vector bestVector = null;
+    for (Vector row : referenceVectors) {
+      double distance = distanceMeasure.distance(query, row);
+      if (distance < bestDistance && (!differentThanQuery || !row.equals(query))) {
+        bestDistance = distance;
+        bestVector = row;
+      }
+    }
+    return new WeightedThing<>(bestVector, bestDistance);
+  }
+
+  /**
+   * Searches with a list full of queries in a threaded fashion.
+   *
+   * @param queries The queries to search for.
+   * @param limit The number of results to return.
+   * @param numThreads   Number of threads to use in searching.
+   * @return A list of result lists.
+   */
+  public List<List<WeightedThing<Vector>>> search(Iterable<WeightedVector> queries,
+                                                  final int limit, int numThreads) throws InterruptedException {
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    List<Callable<Object>> tasks = Lists.newArrayList();
+
+    final List<List<WeightedThing<Vector>>> results = Lists.newArrayList();
+    int i = 0;
+    for (final Vector query : queries) {
+      results.add(null);
+      final int index = i++;
+      tasks.add(new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          results.set(index, BruteSearch.this.search(query, limit));
+          return null;
+        }
+      });
+    }
+
+    executor.invokeAll(tasks);
+    executor.shutdown();
+
+    return results;
+  }
+
+  @Override
+  public Iterator<Vector> iterator() {
+    return referenceVectors.iterator();
+  }
+
+  @Override
+  public boolean remove(Vector query, double epsilon) {
+    int rowNumber = 0;
+    for (Vector row : referenceVectors) {
+      double distance = distanceMeasure.distance(query, row);
+      if (distance < epsilon) {
+        referenceVectors.remove(rowNumber);
+        return true;
+      }
+      rowNumber++;
+    }
+    return false;
+  }
+
+  @Override
+  public void clear() {
+    referenceVectors.clear();
+  }
+}