You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/04 14:29:04 UTC

[02/53] [abbrv] [partial] mahout git commit: end of day 6-2-2018

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
new file mode 100644
index 0000000..13da38a
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.SolverTest;
+import org.apache.mahout.math.function.Functions;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+public final class TestDistributedRowMatrix extends MahoutTestCase {
+  public static final String TEST_PROPERTY_KEY = "test.property.key";
+  public static final String TEST_PROPERTY_VALUE = "test.property.value";
+
+  private static void assertEquals(VectorIterable m, VectorIterable mtt, double errorTolerance) {
+    Iterator<MatrixSlice> mIt = m.iterateAll();
+    Iterator<MatrixSlice> mttIt = mtt.iterateAll();
+    Map<Integer, Vector> mMap = Maps.newHashMap();
+    Map<Integer, Vector> mttMap = Maps.newHashMap();
+    while (mIt.hasNext() && mttIt.hasNext()) {
+      MatrixSlice ms = mIt.next();
+      mMap.put(ms.index(), ms.vector());
+      MatrixSlice mtts = mttIt.next();
+      mttMap.put(mtts.index(), mtts.vector());
+    }
+    for (Map.Entry<Integer, Vector> entry : mMap.entrySet()) {
+      Integer key = entry.getKey();
+      Vector value = entry.getValue();
+      if (value == null || mttMap.get(key) == null) {
+        assertTrue(value == null || value.norm(2) == 0);
+        assertTrue(mttMap.get(key) == null || mttMap.get(key).norm(2) == 0);
+      } else {
+        assertTrue(
+            value.getDistanceSquared(mttMap.get(key)) < errorTolerance);
+      }
+    }
+  }
+
+  @Test
+  public void testTranspose() throws Exception {
+    DistributedRowMatrix m = randomDistributedMatrix(10, 9, 5, 4, 1.0, false);
+    m.setConf(getConfiguration());
+    DistributedRowMatrix mt = m.transpose();
+    mt.setConf(getConfiguration());
+
+    Path tmpPath = getTestTempDirPath();
+    m.setOutputTempPathString(tmpPath.toString());
+    Path tmpOutPath = new Path(tmpPath, "/tmpOutTranspose");
+    mt.setOutputTempPathString(tmpOutPath.toString());
+    HadoopUtil.delete(getConfiguration(), tmpOutPath);
+    DistributedRowMatrix mtt = mt.transpose();
+    assertEquals(m, mtt, EPSILON);
+  }
+
+  @Test
+  public void testMatrixColumnMeansJob() throws Exception {
+    Matrix m =
+        SolverTest.randomSequentialAccessSparseMatrix(100, 90, 50, 20, 1.0);
+    DistributedRowMatrix dm =
+        randomDistributedMatrix(100, 90, 50, 20, 1.0, false);
+    dm.setConf(getConfiguration());
+
+    Vector expected = new DenseVector(50);
+    for (int i = 0; i < m.numRows(); i++) {
+      expected.assign(m.viewRow(i), Functions.PLUS);
+    }
+    expected.assign(Functions.DIV, m.numRows());
+    Vector actual = dm.columnMeans("DenseVector");
+    assertEquals(0.0, expected.getDistanceSquared(actual), EPSILON);
+  }
+
+  @Test
+  public void testNullMatrixColumnMeansJob() throws Exception {
+    Matrix m =
+        SolverTest.randomSequentialAccessSparseMatrix(100, 90, 0, 0, 1.0);
+    DistributedRowMatrix dm =
+        randomDistributedMatrix(100, 90, 0, 0, 1.0, false);
+    dm.setConf(getConfiguration());
+
+    Vector expected = new DenseVector(0);
+    for (int i = 0; i < m.numRows(); i++) {
+      expected.assign(m.viewRow(i), Functions.PLUS);
+    }
+    expected.assign(Functions.DIV, m.numRows());
+    Vector actual = dm.columnMeans();
+    assertEquals(0.0, expected.getDistanceSquared(actual), EPSILON);
+  }
+
+  @Test
+  public void testMatrixTimesVector() throws Exception {
+    Vector v = new RandomAccessSparseVector(50);
+    v.assign(1.0);
+    Matrix m = SolverTest.randomSequentialAccessSparseMatrix(100, 90, 50, 20, 1.0);
+    DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false);
+    dm.setConf(getConfiguration());
+
+    Vector expected = m.times(v);
+    Vector actual = dm.times(v);
+    assertEquals(0.0, expected.getDistanceSquared(actual), EPSILON);
+  }
+
+  @Test
+  public void testMatrixTimesSquaredVector() throws Exception {
+    Vector v = new RandomAccessSparseVector(50);
+    v.assign(1.0);
+    Matrix m = SolverTest.randomSequentialAccessSparseMatrix(100, 90, 50, 20, 1.0);
+    DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false);
+    dm.setConf(getConfiguration());
+
+    Vector expected = m.timesSquared(v);
+    Vector actual = dm.timesSquared(v);
+    assertEquals(0.0, expected.getDistanceSquared(actual), 1.0e-9);
+  }
+
+  @Test
+  public void testMatrixTimesMatrix() throws Exception {
+    Matrix inputA = SolverTest.randomSequentialAccessSparseMatrix(20, 19, 15, 5, 10.0);
+    Matrix inputB = SolverTest.randomSequentialAccessSparseMatrix(20, 13, 25, 10, 5.0);
+    Matrix expected = inputA.transpose().times(inputB);
+
+    DistributedRowMatrix distA = randomDistributedMatrix(20, 19, 15, 5, 10.0, false, "distA");
+    distA.setConf(getConfiguration());
+    DistributedRowMatrix distB = randomDistributedMatrix(20, 13, 25, 10, 5.0, false, "distB");
+    distB.setConf(getConfiguration());
+    DistributedRowMatrix product = distA.times(distB);
+
+    assertEquals(expected, product, EPSILON);
+  }
+
+  @Test
+  public void testMatrixMultiplactionJobConfBuilder() throws Exception {
+    Configuration initialConf = createInitialConf();
+
+    Path baseTmpDirPath = getTestTempDirPath("testpaths");
+    Path aPath = new Path(baseTmpDirPath, "a");
+    Path bPath = new Path(baseTmpDirPath, "b");
+    Path outPath = new Path(baseTmpDirPath, "out");
+
+    Configuration mmJobConf = MatrixMultiplicationJob.createMatrixMultiplyJobConf(aPath, bPath, outPath, 10);
+    Configuration mmCustomJobConf = MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf,
+                                                                                        aPath,
+                                                                                        bPath,
+                                                                                        outPath,
+                                                                                        10);
+
+    assertNull(mmJobConf.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, mmCustomJobConf.get(TEST_PROPERTY_KEY));
+  }
+
+  @Test
+  public void testTransposeJobConfBuilder() throws Exception {
+    Configuration initialConf = createInitialConf();
+
+    Path baseTmpDirPath = getTestTempDirPath("testpaths");
+    Path inputPath = new Path(baseTmpDirPath, "input");
+    Path outputPath = new Path(baseTmpDirPath, "output");
+
+    Configuration transposeJobConf = TransposeJob.buildTransposeJob(inputPath, outputPath, 10).getConfiguration();
+
+    Configuration transposeCustomJobConf = TransposeJob.buildTransposeJob(initialConf, inputPath, outputPath, 10)
+                                                       .getConfiguration();
+
+    assertNull(transposeJobConf.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, transposeCustomJobConf.get(TEST_PROPERTY_KEY));
+  }
+
+  @Test public void testTimesSquaredJobConfBuilders() throws Exception {
+    Configuration initialConf = createInitialConf();
+
+    Path baseTmpDirPath = getTestTempDirPath("testpaths");
+    Path inputPath = new Path(baseTmpDirPath, "input");
+    Path outputPath = new Path(baseTmpDirPath, "output");
+
+    Vector v = new RandomAccessSparseVector(50);
+    v.assign(1.0);
+
+    Job timesSquaredJob1 = TimesSquaredJob.createTimesSquaredJob(v, inputPath, outputPath);
+    Job customTimesSquaredJob1 = TimesSquaredJob.createTimesSquaredJob(initialConf, v, inputPath, outputPath);
+
+    assertNull(timesSquaredJob1.getConfiguration().get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, customTimesSquaredJob1.getConfiguration().get(TEST_PROPERTY_KEY));
+
+    Job timesJob = TimesSquaredJob.createTimesJob(v, 50, inputPath, outputPath);
+    Job customTimesJob = TimesSquaredJob.createTimesJob(initialConf, v, 50, inputPath, outputPath);
+
+    assertNull(timesJob.getConfiguration().get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, customTimesJob.getConfiguration().get(TEST_PROPERTY_KEY));
+
+    Job timesSquaredJob2 = TimesSquaredJob.createTimesSquaredJob(v, inputPath, outputPath,
+        TimesSquaredJob.TimesSquaredMapper.class, TimesSquaredJob.VectorSummingReducer.class);
+
+    Job customTimesSquaredJob2 = TimesSquaredJob.createTimesSquaredJob(initialConf, v, inputPath,
+        outputPath, TimesSquaredJob.TimesSquaredMapper.class, TimesSquaredJob.VectorSummingReducer.class);
+
+    assertNull(timesSquaredJob2.getConfiguration().get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, customTimesSquaredJob2.getConfiguration().get(TEST_PROPERTY_KEY));
+
+    Job timesSquaredJob3 = TimesSquaredJob.createTimesSquaredJob(v, 50, inputPath, outputPath,
+        TimesSquaredJob.TimesSquaredMapper.class, TimesSquaredJob.VectorSummingReducer.class);
+
+    Job customTimesSquaredJob3 = TimesSquaredJob.createTimesSquaredJob(initialConf,
+        v, 50, inputPath, outputPath, TimesSquaredJob.TimesSquaredMapper.class,
+        TimesSquaredJob.VectorSummingReducer.class);
+
+    assertNull(timesSquaredJob3.getConfiguration().get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, customTimesSquaredJob3.getConfiguration().get(TEST_PROPERTY_KEY));
+  }
+
+  @Test
+  public void testTimesVectorTempDirDeletion() throws Exception {
+    Configuration conf = getConfiguration();
+    Vector v = new RandomAccessSparseVector(50);
+    v.assign(1.0);
+    DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false);
+    dm.setConf(conf);
+
+    Path outputPath = dm.getOutputTempPath();
+    FileSystem fs = outputPath.getFileSystem(conf);
+
+    deleteContentsOfPath(conf, outputPath);
+
+    assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length);
+
+    Vector result1 = dm.times(v);
+
+    assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length);
+
+    deleteContentsOfPath(conf, outputPath);
+    assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length);
+
+    conf.setBoolean(DistributedRowMatrix.KEEP_TEMP_FILES, true);
+    dm.setConf(conf);
+
+    Vector result2 = dm.times(v);
+
+    FileStatus[] outputStatuses = fs.listStatus(outputPath);
+    assertEquals(1, outputStatuses.length);
+    Path outputTempPath = outputStatuses[0].getPath();
+    Path inputVectorPath = new Path(outputTempPath, TimesSquaredJob.INPUT_VECTOR);
+    Path outputVectorPath = new Path(outputTempPath, TimesSquaredJob.OUTPUT_VECTOR_FILENAME);
+    assertEquals(1, fs.listStatus(inputVectorPath, PathFilters.logsCRCFilter()).length);
+    assertEquals(1, fs.listStatus(outputVectorPath, PathFilters.logsCRCFilter()).length);
+
+    assertEquals(0.0, result1.getDistanceSquared(result2), EPSILON);
+  }
+
+  @Test
+  public void testTimesSquaredVectorTempDirDeletion() throws Exception {
+    Configuration conf = getConfiguration();
+    Vector v = new RandomAccessSparseVector(50);
+    v.assign(1.0);
+    DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false);
+    dm.setConf(getConfiguration());
+
+    Path outputPath = dm.getOutputTempPath();
+    FileSystem fs = outputPath.getFileSystem(conf);
+
+    deleteContentsOfPath(conf, outputPath);
+
+    assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length);
+
+    Vector result1 = dm.timesSquared(v);
+
+    assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length);
+
+    deleteContentsOfPath(conf, outputPath);
+    assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length);
+
+    conf.setBoolean(DistributedRowMatrix.KEEP_TEMP_FILES, true);
+    dm.setConf(conf);
+
+    Vector result2 = dm.timesSquared(v);
+
+    FileStatus[] outputStatuses = fs.listStatus(outputPath);
+    assertEquals(1, outputStatuses.length);
+    Path outputTempPath = outputStatuses[0].getPath();
+    Path inputVectorPath = new Path(outputTempPath, TimesSquaredJob.INPUT_VECTOR);
+    Path outputVectorPath = new Path(outputTempPath, TimesSquaredJob.OUTPUT_VECTOR_FILENAME);
+    assertEquals(1, fs.listStatus(inputVectorPath, PathFilters.logsCRCFilter()).length);
+    assertEquals(1, fs.listStatus(outputVectorPath, PathFilters.logsCRCFilter()).length);
+
+    assertEquals(0.0, result1.getDistanceSquared(result2), EPSILON);
+  }
+
+  public Configuration createInitialConf() throws IOException {
+    Configuration initialConf = getConfiguration();
+    initialConf.set(TEST_PROPERTY_KEY, TEST_PROPERTY_VALUE);
+    return initialConf;
+  }
+
+  private static void deleteContentsOfPath(Configuration conf, Path path) throws Exception {
+    FileSystem fs = path.getFileSystem(conf);
+
+    FileStatus[] statuses = HadoopUtil.listStatus(fs, path);
+    for (FileStatus status : statuses) {
+      fs.delete(status.getPath(), true);
+    }
+  }
+
+  public DistributedRowMatrix randomDistributedMatrix(int numRows,
+                                                      int nonNullRows,
+                                                      int numCols,
+                                                      int entriesPerRow,
+                                                      double entryMean,
+                                                      boolean isSymmetric) throws IOException {
+    return randomDistributedMatrix(numRows, nonNullRows, numCols, entriesPerRow, entryMean, isSymmetric, "testdata");
+  }
+
+  public DistributedRowMatrix randomDenseHierarchicalDistributedMatrix(int numRows,
+                                                                       int numCols,
+                                                                       boolean isSymmetric,
+                                                                       String baseTmpDirSuffix)
+    throws IOException {
+    Path baseTmpDirPath = getTestTempDirPath(baseTmpDirSuffix);
+    Matrix c = SolverTest.randomHierarchicalMatrix(numRows, numCols, isSymmetric);
+    return saveToFs(c, baseTmpDirPath);
+  }
+
+  public DistributedRowMatrix randomDistributedMatrix(int numRows,
+                                                      int nonNullRows,
+                                                      int numCols,
+                                                      int entriesPerRow,
+                                                      double entryMean,
+                                                      boolean isSymmetric,
+                                                      String baseTmpDirSuffix) throws IOException {
+    Path baseTmpDirPath = getTestTempDirPath(baseTmpDirSuffix);
+    Matrix c = SolverTest.randomSequentialAccessSparseMatrix(numRows, nonNullRows, numCols, entriesPerRow, entryMean);
+    if (isSymmetric) {
+      c = c.times(c.transpose());
+    }
+    return saveToFs(c, baseTmpDirPath);
+  }
+
+  private DistributedRowMatrix saveToFs(final Matrix m, Path baseTmpDirPath) throws IOException {
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(baseTmpDirPath.toUri(), conf);
+
+    ClusteringTestUtils.writePointsToFile(new Iterable<VectorWritable>() {
+      @Override
+      public Iterator<VectorWritable> iterator() {
+        return Iterators.transform(m.iterator(), new Function<MatrixSlice,VectorWritable>() {
+          @Override
+          public VectorWritable apply(MatrixSlice input) {
+            return new VectorWritable(input.vector());
+          }
+        });
+      }
+    }, true, new Path(baseTmpDirPath, "distMatrix/part-00000"), fs, conf);
+
+    DistributedRowMatrix distMatrix = new DistributedRowMatrix(new Path(baseTmpDirPath, "distMatrix"),
+                                                               new Path(baseTmpDirPath, "tmpOut"),
+                                                               m.numRows(),
+                                                               m.numCols());
+    distMatrix.setConf(new Configuration(conf));
+
+    return distMatrix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java
new file mode 100644
index 0000000..ac01c28
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.decomposer.SolverTest;
+import org.apache.mahout.math.decomposer.lanczos.LanczosState;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.TestDistributedRowMatrix;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+
+@Deprecated
+public final class TestDistributedLanczosSolver extends MahoutTestCase {
+
+  private int counter = 0;
+  private DistributedRowMatrix symCorpus;
+  private DistributedRowMatrix asymCorpus;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    File symTestData = getTestTempDir("symTestData");
+    File asymTestData = getTestTempDir("asymTestData");
+    symCorpus = new TestDistributedRowMatrix().randomDistributedMatrix(100,
+        90, 80, 2, 10.0, true, symTestData.getAbsolutePath());
+    asymCorpus = new TestDistributedRowMatrix().randomDistributedMatrix(100,
+        90, 80, 2, 10.0, false, asymTestData.getAbsolutePath());
+  }
+
+  private static String suf(boolean symmetric) {
+    return symmetric ? "_sym" : "_asym";
+  }
+
+  private DistributedRowMatrix getCorpus(boolean symmetric) {
+    return symmetric ? symCorpus : asymCorpus;
+  }
+
+  /*
+  private LanczosState doTestDistributedLanczosSolver(boolean symmetric,
+      int desiredRank) throws IOException {
+    return doTestDistributedLanczosSolver(symmetric, desiredRank, true);
+  }
+   */
+
+  private LanczosState doTestDistributedLanczosSolver(boolean symmetric,
+      int desiredRank, boolean hdfsBackedState)
+      throws IOException {
+    DistributedRowMatrix corpus = getCorpus(symmetric);
+    Configuration conf = getConfiguration();
+    corpus.setConf(conf);
+    DistributedLanczosSolver solver = new DistributedLanczosSolver();
+    Vector intitialVector = DistributedLanczosSolver.getInitialVector(corpus);
+    LanczosState state;
+    if (hdfsBackedState) {
+      HdfsBackedLanczosState hState = new HdfsBackedLanczosState(corpus,
+          desiredRank, intitialVector, new Path(getTestTempDirPath(),
+              "lanczosStateDir" + suf(symmetric) + counter));
+      hState.setConf(conf);
+      state = hState;
+    } else {
+      state = new LanczosState(corpus, desiredRank, intitialVector);
+    }
+    solver.solve(state, desiredRank, symmetric);
+    SolverTest.assertOrthonormal(state);
+    for (int i = 0; i < desiredRank/2; i++) {
+      SolverTest.assertEigen(i, state.getRightSingularVector(i), corpus, 0.1, symmetric);
+    }
+    counter++;
+    return state;
+  }
+
+  public void doTestResumeIteration(boolean symmetric) throws IOException {
+    DistributedRowMatrix corpus = getCorpus(symmetric);
+    Configuration conf = getConfiguration();
+    corpus.setConf(conf);
+    DistributedLanczosSolver solver = new DistributedLanczosSolver();
+    int rank = 10;
+    Vector intitialVector = DistributedLanczosSolver.getInitialVector(corpus);
+    HdfsBackedLanczosState state = new HdfsBackedLanczosState(corpus, rank,
+        intitialVector, new Path(getTestTempDirPath(), "lanczosStateDir" + suf(symmetric) + counter));
+    solver.solve(state, rank, symmetric);
+
+    rank *= 2;
+    state = new HdfsBackedLanczosState(corpus, rank,
+        intitialVector, new Path(getTestTempDirPath(), "lanczosStateDir" + suf(symmetric) + counter));
+    solver = new DistributedLanczosSolver();
+    solver.solve(state, rank, symmetric);
+
+    LanczosState allAtOnceState = doTestDistributedLanczosSolver(symmetric, rank, false);
+    for (int i=0; i<state.getIterationNumber(); i++) {
+      Vector v = state.getBasisVector(i).normalize();
+      Vector w = allAtOnceState.getBasisVector(i).normalize();
+      double diff = v.minus(w).norm(2);
+      assertTrue("basis " + i + " is too long: " + diff, diff < 0.1);
+    }
+    counter++;
+  }
+
+  // TODO when this can be made to run in under 20 minutes, re-enable
+  /*
+  @Test
+  public void testDistributedLanczosSolver() throws Exception {
+    doTestDistributedLanczosSolver(true, 30);
+    doTestDistributedLanczosSolver(false, 30);
+    doTestResumeIteration(true);
+    doTestResumeIteration(false);
+  }
+   */
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
new file mode 100644
index 0000000..5dfb328
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.TestDistributedRowMatrix;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Arrays;
+
+@Deprecated
+public final class TestDistributedLanczosSolverCLI extends MahoutTestCase {
+  private static final Logger log = LoggerFactory.getLogger(TestDistributedLanczosSolverCLI.class);
+
+  @Test
+  public void testDistributedLanczosSolverCLI() throws Exception {
+    Path testData = getTestTempDirPath("testdata");
+    DistributedRowMatrix corpus =
+        new TestDistributedRowMatrix().randomDenseHierarchicalDistributedMatrix(10, 9, false,
+            testData.toString());
+    corpus.setConf(getConfiguration());
+    Path output = getTestTempDirPath("output");
+    Path tmp = getTestTempDirPath("tmp");
+    Path workingDir = getTestTempDirPath("working");
+    String[] args = {
+        "-i", new Path(testData, "distMatrix").toString(),
+        "-o", output.toString(),
+        "--tempDir", tmp.toString(),
+        "--numRows", "10",
+        "--numCols", "9",
+        "--rank", "6",
+        "--symmetric", "false",
+        "--workingDir", workingDir.toString()
+    };
+    ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args);
+
+    output = getTestTempDirPath("output2");
+    tmp = getTestTempDirPath("tmp2");
+    args = new String[] {
+        "-i", new Path(testData, "distMatrix").toString(),
+        "-o", output.toString(),
+        "--tempDir", tmp.toString(),
+        "--numRows", "10",
+        "--numCols", "9",
+        "--rank", "7",
+        "--symmetric", "false",
+        "--workingDir", workingDir.toString()
+    };
+    ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args);
+
+    Path rawEigenvectors = new Path(output, DistributedLanczosSolver.RAW_EIGENVECTORS);
+    Matrix eigenVectors = new DenseMatrix(7, corpus.numCols());
+    Configuration conf = getConfiguration();
+
+    int i = 0;
+    for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(rawEigenvectors, conf)) {
+      Vector v = value.get();
+      eigenVectors.assignRow(i, v);
+      i++;
+    }
+    assertEquals("number of eigenvectors", 7, i);
+  }
+
+  @Test
+  public void testDistributedLanczosSolverEVJCLI() throws Exception {
+    Path testData = getTestTempDirPath("testdata");
+    DistributedRowMatrix corpus = new TestDistributedRowMatrix()
+        .randomDenseHierarchicalDistributedMatrix(10, 9, false, testData.toString());
+    corpus.setConf(getConfiguration());
+    Path output = getTestTempDirPath("output");
+    Path tmp = getTestTempDirPath("tmp");
+    String[] args = {
+        "-i", new Path(testData, "distMatrix").toString(),
+        "-o", output.toString(),
+        "--tempDir", tmp.toString(),
+        "--numRows", "10",
+        "--numCols", "9",
+        "--rank", "6",
+        "--symmetric", "false",
+        "--cleansvd", "true"
+    };
+    ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args);
+  
+    Path cleanEigenvectors = new Path(output, EigenVerificationJob.CLEAN_EIGENVECTORS);
+    Matrix eigenVectors = new DenseMatrix(6, corpus.numCols());
+    Collection<Double> eigenvalues = Lists.newArrayList();
+
+    output = getTestTempDirPath("output2");
+    tmp = getTestTempDirPath("tmp2");
+    args = new String[] {
+        "-i", new Path(testData, "distMatrix").toString(),
+        "-o", output.toString(),
+        "--tempDir", tmp.toString(),
+        "--numRows", "10",
+        "--numCols", "9",
+        "--rank", "7",
+        "--symmetric", "false",
+        "--cleansvd", "true"
+    };
+    ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args);
+    Path cleanEigenvectors2 = new Path(output, EigenVerificationJob.CLEAN_EIGENVECTORS);
+    Matrix eigenVectors2 = new DenseMatrix(7, corpus.numCols());
+    Configuration conf = getConfiguration();
+    Collection<Double> newEigenValues = Lists.newArrayList();
+
+    int i = 0;
+    for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(cleanEigenvectors, conf)) {
+      NamedVector v = (NamedVector) value.get();
+      eigenVectors.assignRow(i, v);
+      log.info(v.getName());
+      if (EigenVector.getCosAngleError(v.getName()) < 1.0e-3) {
+        eigenvalues.add(EigenVector.getEigenValue(v.getName()));
+      }
+      i++;
+    }
+    assertEquals("number of clean eigenvectors", 3, i);
+
+    i = 0;
+    for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(cleanEigenvectors2, conf)) {
+      NamedVector v = (NamedVector) value.get();
+      log.info(v.getName());
+      eigenVectors2.assignRow(i, v);
+      newEigenValues.add(EigenVector.getEigenValue(v.getName()));
+      i++;
+    }
+
+    Collection<Integer> oldEigensFound = Lists.newArrayList();
+    for (int row = 0; row < eigenVectors.numRows(); row++) {
+      Vector oldEigen = eigenVectors.viewRow(row);
+      if (oldEigen == null) {
+        break;
+      }
+      for (int newRow = 0; newRow < eigenVectors2.numRows(); newRow++) {
+        Vector newEigen = eigenVectors2.viewRow(newRow);
+        if (newEigen != null && oldEigen.dot(newEigen) > 0.9) {
+          oldEigensFound.add(row);
+          break;
+        }
+      }
+    }
+    assertEquals("the number of new eigenvectors", 5, i);
+
+    Collection<Double> oldEigenValuesNotFound = Lists.newArrayList();
+    for (double d : eigenvalues) {
+      boolean found = false;
+      for (double newD : newEigenValues) {
+        if (Math.abs((d - newD)/d) < 0.1) {
+          found = true;
+        }
+      }
+      if (!found) {
+        oldEigenValuesNotFound.add(d);
+      }
+    }
+    assertEquals("number of old eigenvalues not found: "
+                 + Arrays.toString(oldEigenValuesNotFound.toArray(new Double[oldEigenValuesNotFound.size()])),
+                0, oldEigenValuesNotFound.size());
+    assertEquals("did not find enough old eigenvectors", 3, oldEigensFound.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java
new file mode 100644
index 0000000..a8a861c
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.StringTuple;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class TestVectorDistanceSimilarityJob extends MahoutTestCase {
+
+  private FileSystem fs;
+
+  private static final double[][] REFERENCE = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 }, { 3, 3 }, { 4, 4 }, { 5, 4 },
+      { 4, 5 }, { 5, 5 } };
+
+  private static final double[][] SEEDS = { { 1, 1 }, { 10, 10 } };
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(getConfiguration());
+  }
+
+  @Test
+  public void testVectorDistanceMapper() throws Exception {
+    Mapper<WritableComparable<?>, VectorWritable, StringTuple, DoubleWritable>.Context context =
+            EasyMock.createMock(Mapper.Context.class);
+    StringTuple tuple = new StringTuple();
+    tuple.add("foo");
+    tuple.add("123");
+    context.write(tuple, new DoubleWritable(Math.sqrt(2.0)));
+    tuple = new StringTuple();
+    tuple.add("foo2");
+    tuple.add("123");
+    context.write(tuple, new DoubleWritable(1));
+
+    EasyMock.replay(context);
+
+    Vector vector = new RandomAccessSparseVector(2);
+    vector.set(0, 2);
+    vector.set(1, 2);
+
+    VectorDistanceMapper mapper = new VectorDistanceMapper();
+    setField(mapper, "measure", new EuclideanDistanceMeasure());
+    Collection<NamedVector> seedVectors = Lists.newArrayList();
+    Vector seed1 = new RandomAccessSparseVector(2);
+    seed1.set(0, 1);
+    seed1.set(1, 1);
+    Vector seed2 = new RandomAccessSparseVector(2);
+    seed2.set(0, 2);
+    seed2.set(1, 1);
+
+    seedVectors.add(new NamedVector(seed1, "foo"));
+    seedVectors.add(new NamedVector(seed2, "foo2"));
+    setField(mapper, "seedVectors", seedVectors);
+
+    mapper.map(new IntWritable(123), new VectorWritable(vector), context);
+
+    EasyMock.verify(context);
+  }
+
+  @Test
+  public void testVectorDistanceInvertedMapper() throws Exception {
+     Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context =
+            EasyMock.createMock(Mapper.Context.class);
+    Vector expectVec = new DenseVector(new double[]{Math.sqrt(2.0), 1.0});
+    context.write(new Text("other"), new VectorWritable(expectVec));
+    EasyMock.replay(context);
+    Vector vector = new NamedVector(new RandomAccessSparseVector(2), "other");
+    vector.set(0, 2);
+    vector.set(1, 2);
+
+    VectorDistanceInvertedMapper mapper = new VectorDistanceInvertedMapper();
+    setField(mapper, "measure", new EuclideanDistanceMeasure());
+    Collection<NamedVector> seedVectors = Lists.newArrayList();
+    Vector seed1 = new RandomAccessSparseVector(2);
+    seed1.set(0, 1);
+    seed1.set(1, 1);
+    Vector seed2 = new RandomAccessSparseVector(2);
+    seed2.set(0, 2);
+    seed2.set(1, 1);
+
+    seedVectors.add(new NamedVector(seed1, "foo"));
+    seedVectors.add(new NamedVector(seed2, "foo2"));
+    setField(mapper, "seedVectors", seedVectors);
+
+    mapper.map(new IntWritable(123), new VectorWritable(vector), context);
+
+    EasyMock.verify(context);
+
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    Path input = getTestTempDirPath("input");
+    Path output = getTestTempDirPath("output");
+    Path seedsPath = getTestTempDirPath("seeds");
+
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    List<VectorWritable> seeds = getPointsWritable(SEEDS);
+
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(input, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(seeds, true, new Path(seedsPath, "part-seeds"), fs, conf);
+
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(),
+        optKey(VectorDistanceSimilarityJob.SEEDS), seedsPath.toString(), optKey(DefaultOptionCreator.OUTPUT_OPTION),
+        output.toString(), optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName() };
+
+    ToolRunner.run(getConfiguration(), new VectorDistanceSimilarityJob(), args);
+
+    int expectedOutputSize = SEEDS.length * REFERENCE.length;
+    int outputSize = Iterables.size(new SequenceFileIterable<StringTuple, DoubleWritable>(new Path(output,
+        "part-m-00000"), conf));
+    assertEquals(expectedOutputSize, outputSize);
+  }
+
+  @Test
+  public void testMaxDistance() throws Exception {
+
+    Path input = getTestTempDirPath("input");
+    Path output = getTestTempDirPath("output");
+    Path seedsPath = getTestTempDirPath("seeds");
+
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    List<VectorWritable> seeds = getPointsWritable(SEEDS);
+
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(input, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(seeds, true, new Path(seedsPath, "part-seeds"), fs, conf);
+
+    double maxDistance = 10;
+
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(),
+        optKey(VectorDistanceSimilarityJob.SEEDS), seedsPath.toString(), optKey(DefaultOptionCreator.OUTPUT_OPTION),
+        output.toString(), optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(VectorDistanceSimilarityJob.MAX_DISTANCE), String.valueOf(maxDistance) };
+
+    ToolRunner.run(getConfiguration(), new VectorDistanceSimilarityJob(), args);
+
+    int outputSize = 0;
+
+    for (Pair<StringTuple, DoubleWritable> record : new SequenceFileIterable<StringTuple, DoubleWritable>(
+        new Path(output, "part-m-00000"), conf)) {
+      outputSize++;
+      assertTrue(record.getSecond().get() <= maxDistance);
+    }
+
+    assertEquals(14, outputSize);
+  }
+
+  @Test
+  public void testRunInverted() throws Exception {
+    Path input = getTestTempDirPath("input");
+    Path output = getTestTempDirPath("output");
+    Path seedsPath = getTestTempDirPath("seeds");
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    List<VectorWritable> seeds = getPointsWritable(SEEDS);
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, true, new Path(input, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(seeds, true, new Path(seedsPath, "part-seeds"), fs, conf);
+    String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(),
+        optKey(VectorDistanceSimilarityJob.SEEDS), seedsPath.toString(), optKey(DefaultOptionCreator.OUTPUT_OPTION),
+        output.toString(), optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(VectorDistanceSimilarityJob.OUT_TYPE_KEY), "v"
+    };
+    ToolRunner.run(getConfiguration(), new VectorDistanceSimilarityJob(), args);
+
+    DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<>();
+
+    for (Pair<Text, VectorWritable> record :  new SequenceFileIterable<Text, VectorWritable>(
+        new Path(output, "part-m-00000"), conf)) {
+      collector.collect(record.getFirst(), record.getSecond());
+    }
+    assertEquals(REFERENCE.length, collector.getData().size());
+    for (Map.Entry<Text, List<VectorWritable>> entry : collector.getData().entrySet()) {
+      assertEquals(SEEDS.length, entry.getValue().iterator().next().get().size());
+    }
+  }
+
+  private static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
new file mode 100644
index 0000000..5d64f90
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.cooccurrence;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.hadoop.MathHelper;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.TanimotoCoefficientSimilarity;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
+import org.junit.Test;
+
+import java.io.File;
+
+public class RowSimilarityJobTest extends MahoutTestCase {
+
+  /**
+   * integration test with a tiny data set
+   *
+   * <pre>
+   *
+   * input matrix:
+   *
+   * 1, 0, 1, 1, 0
+   * 0, 0, 1, 1, 0
+   * 0, 0, 0, 0, 1
+   *
+   * similarity matrix (via tanimoto):
+   *
+   * 1,     0.666, 0
+   * 0.666, 1,     0
+   * 0,     0,     1
+   * </pre>
+   * @throws Exception
+   */
+  @Test
+  public void toyIntegration() throws Exception {
+
+    File inputFile = getTestTempFile("rows");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tmpDir = getTestTempDir("tmp");
+
+    Configuration conf = getConfiguration();
+    Path inputPath = new Path(inputFile.getAbsolutePath());
+    FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+    MathHelper.writeDistributedRowMatrix(new double[][] {
+        new double[] { 1, 0, 1, 1, 0 },
+        new double[] { 0, 0, 1, 1, 0 },
+        new double[] { 0, 0, 0, 0, 1 } },
+        fs, conf, inputPath);
+
+    RowSimilarityJob rowSimilarityJob = new RowSimilarityJob();
+    rowSimilarityJob.setConf(conf);
+    rowSimilarityJob.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+        "--numberOfColumns", String.valueOf(5), "--similarityClassname", TanimotoCoefficientSimilarity.class.getName(),
+        "--tempDir", tmpDir.getAbsolutePath() });
+
+
+    OpenIntIntHashMap observationsPerColumn =
+        Vectors.readAsIntMap(new Path(tmpDir.getAbsolutePath(), "observationsPerColumn.bin"), conf);
+    assertEquals(4, observationsPerColumn.size());
+    assertEquals(1, observationsPerColumn.get(0));
+    assertEquals(2, observationsPerColumn.get(2));
+    assertEquals(2, observationsPerColumn.get(3));
+    assertEquals(1, observationsPerColumn.get(4));
+
+    Matrix similarityMatrix = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
+
+    assertNotNull(similarityMatrix);
+    assertEquals(3, similarityMatrix.numCols());
+    assertEquals(3, similarityMatrix.numRows());
+
+    assertEquals(1.0, similarityMatrix.get(0, 0), EPSILON);
+    assertEquals(1.0, similarityMatrix.get(1, 1), EPSILON);
+    assertEquals(1.0, similarityMatrix.get(2, 2), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(2, 0), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(2, 1), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(0, 2), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(1, 2), EPSILON);
+    assertEquals(0.666666, similarityMatrix.get(0, 1), EPSILON);
+    assertEquals(0.666666, similarityMatrix.get(1, 0), EPSILON);
+  }
+
+  @Test
+  public void toyIntegrationMaxSimilaritiesPerRow() throws Exception {
+
+    File inputFile = getTestTempFile("rows");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tmpDir = getTestTempDir("tmp");
+
+    Configuration conf = getConfiguration();
+    Path inputPath = new Path(inputFile.getAbsolutePath());
+    FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+    MathHelper.writeDistributedRowMatrix(new double[][]{
+        new double[] { 1, 0, 1, 1, 0, 1 },
+        new double[] { 0, 1, 1, 1, 1, 1 },
+        new double[] { 1, 1, 0, 1, 0, 0 } },
+        fs, conf, inputPath);
+
+    RowSimilarityJob rowSimilarityJob = new RowSimilarityJob();
+    rowSimilarityJob.setConf(conf);
+    rowSimilarityJob.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+        "--numberOfColumns", String.valueOf(6), "--similarityClassname", TanimotoCoefficientSimilarity.class.getName(),
+        "--maxSimilaritiesPerRow", String.valueOf(1), "--excludeSelfSimilarity", String.valueOf(true),
+        "--tempDir", tmpDir.getAbsolutePath() });
+
+    Matrix similarityMatrix = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
+
+    assertNotNull(similarityMatrix);
+    assertEquals(3, similarityMatrix.numCols());
+    assertEquals(3, similarityMatrix.numRows());
+
+    assertEquals(0.0, similarityMatrix.get(0, 0), EPSILON);
+    assertEquals(0.5, similarityMatrix.get(0, 1), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(0, 2), EPSILON);
+
+    assertEquals(0.5, similarityMatrix.get(1, 0), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(1, 1), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(1, 2), EPSILON);
+
+    assertEquals(0.4, similarityMatrix.get(2, 0), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(2, 1), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(2, 2), EPSILON);
+  }
+
+  @Test
+  public void toyIntegrationWithThreshold() throws Exception {
+
+
+    File inputFile = getTestTempFile("rows");
+    File outputDir = getTestTempDir("output");
+    outputDir.delete();
+    File tmpDir = getTestTempDir("tmp");
+
+    Configuration conf = getConfiguration();
+    Path inputPath = new Path(inputFile.getAbsolutePath());
+    FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+    MathHelper.writeDistributedRowMatrix(new double[][]{
+        new double[] { 1, 0, 1, 1, 0, 1 },
+        new double[] { 0, 1, 1, 1, 1, 1 },
+        new double[] { 1, 1, 0, 1, 0, 0 } },
+        fs, conf, inputPath);
+
+    RowSimilarityJob rowSimilarityJob = new RowSimilarityJob();
+    rowSimilarityJob.setConf(conf);
+    rowSimilarityJob.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(),
+        "--numberOfColumns", String.valueOf(6), "--similarityClassname", TanimotoCoefficientSimilarity.class.getName(),
+        "--excludeSelfSimilarity", String.valueOf(true), "--threshold", String.valueOf(0.5),
+        "--tempDir", tmpDir.getAbsolutePath() });
+
+    Matrix similarityMatrix = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
+
+    assertNotNull(similarityMatrix);
+    assertEquals(3, similarityMatrix.numCols());
+    assertEquals(3, similarityMatrix.numRows());
+
+    assertEquals(0.0, similarityMatrix.get(0, 0), EPSILON);
+    assertEquals(0.5, similarityMatrix.get(0, 1), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(0, 2), EPSILON);
+
+    assertEquals(0.5, similarityMatrix.get(1, 0), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(1, 1), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(1, 2), EPSILON);
+
+    assertEquals(0.0, similarityMatrix.get(2, 0), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(2, 1), EPSILON);
+    assertEquals(0.0, similarityMatrix.get(2, 2), EPSILON);
+  }
+
+  @Test
+  public void testVectorDimensions() throws Exception {
+
+    File inputFile = getTestTempFile("rows");
+
+    Configuration conf = getConfiguration();
+    Path inputPath = new Path(inputFile.getAbsolutePath());
+    FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+
+    MathHelper.writeDistributedRowMatrix(new double[][] {
+        new double[] { 1, 0, 1, 1, 0, 1 },
+        new double[] { 0, 1, 1, 1, 1, 1 },
+        new double[] { 1, 1, 0, 1, 0, 0 } },
+        fs, conf, inputPath);
+
+    RowSimilarityJob rowSimilarityJob = new RowSimilarityJob();
+    rowSimilarityJob.setConf(conf);
+
+    int numberOfColumns = rowSimilarityJob.getDimensions(inputPath);
+
+    assertEquals(6, numberOfColumns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java
new file mode 100644
index 0000000..c8a8c51
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.similarity.cooccurrence.measures;
+
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.junit.Test;
+
+public class VectorSimilarityMeasuresTest extends MahoutTestCase {
+
+  static double distributedSimilarity(double[] one,
+                                      double[] two,
+                                      Class<? extends VectorSimilarityMeasure> similarityMeasureClass) {
+    double rand = computeSimilarity(one, two, similarityMeasureClass, new RandomAccessSparseVector(one.length));
+    double seq = computeSimilarity(one, two, similarityMeasureClass, new SequentialAccessSparseVector(one.length));
+    double dense = computeSimilarity(one, two, similarityMeasureClass, new DenseVector(one.length));
+    assertEquals(seq, rand, 1.0e-10);
+    assertEquals(seq, dense, 1.0e-10);
+    assertEquals(dense, rand, 1.0e-10);
+    return seq;
+  }
+
+  private static double computeSimilarity(double[] one, double[] two,
+      Class<? extends VectorSimilarityMeasure> similarityMeasureClass,
+      Vector like) {
+    VectorSimilarityMeasure similarityMeasure = ClassUtils.instantiateAs(similarityMeasureClass,
+        VectorSimilarityMeasure.class);
+    Vector oneNormalized = similarityMeasure.normalize(asVector(one, like));
+    Vector twoNormalized = similarityMeasure.normalize(asVector(two, like));
+
+    double normOne = similarityMeasure.norm(oneNormalized);
+    double normTwo = similarityMeasure.norm(twoNormalized);
+
+    double dot = 0;
+    for (int n = 0; n < one.length; n++) {
+      if (oneNormalized.get(n) != 0 && twoNormalized.get(n) != 0) {
+        dot += similarityMeasure.aggregate(oneNormalized.get(n), twoNormalized.get(n));
+      }
+    }
+
+    return similarityMeasure.similarity(dot, normOne, normTwo, one.length);
+  }
+
+  static Vector asVector(double[] values, Vector like) {
+    Vector vector = like.like();
+    for (int dim = 0; dim < values.length; dim++) {
+      vector.set(dim, values[dim]);
+    }
+    return vector;
+  }
+
+  @Test
+  public void testCooccurrenceCountSimilarity() {
+    double similarity = distributedSimilarity(
+        new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 },
+        new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, CooccurrenceCountSimilarity.class);
+
+    assertEquals(5.0, similarity, 0);
+  }
+
+  @Test
+  public void testTanimotoCoefficientSimilarity() {
+    double similarity = distributedSimilarity(
+        new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 },
+        new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, TanimotoCoefficientSimilarity.class);
+
+    assertEquals(0.454545455, similarity, EPSILON);
+  }
+
+  @Test
+  public void testCityblockSimilarity() {
+    double similarity = distributedSimilarity(
+        new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 },
+        new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, CityBlockSimilarity.class);
+
+    assertEquals(0.142857143, similarity, EPSILON);
+  }
+
+  @Test
+  public void testLoglikelihoodSimilarity() {
+    double similarity = distributedSimilarity(
+        new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 },
+        new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, LoglikelihoodSimilarity.class);
+
+    assertEquals(0.03320155369284261, similarity, EPSILON);
+  }
+
+  @Test
+  public void testCosineSimilarity() {
+    double similarity = distributedSimilarity(
+        new double[] { 0, 2, 0, 0, 8, 3, 0, 6, 0, 1, 2, 2, 0 },
+        new double[] { 3, 0, 0, 0, 7, 0, 2, 2, 1, 3, 2, 1, 1 }, CosineSimilarity.class);
+
+    assertEquals(0.769846046, similarity, EPSILON);
+  }
+
+  @Test
+  public void testPearsonCorrelationSimilarity() {
+    double similarity = distributedSimilarity(
+        new double[] { 0, 2, 0, 0, 8, 3, 0, 6, 0, 1, 1, 2, 1 },
+        new double[] { 3, 0, 0, 0, 7, 0, 2, 2, 1, 3, 2, 4, 3 }, PearsonCorrelationSimilarity.class);
+
+    assertEquals(0.5303300858899108, similarity, EPSILON);
+  }
+
+  @Test
+  public void testEuclideanDistanceSimilarity() {
+    double similarity = distributedSimilarity(
+        new double[] { 0, 2, 0, 0, 8, 3, 0, 6, 0, 1, 1, 2, 1 },
+        new double[] { 3, 0, 0, 0, 7, 0, 2, 2, 1, 3, 2, 4, 4 }, EuclideanDistanceSimilarity.class);
+
+    assertEquals(0.11268865367232477, similarity, EPSILON);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java
new file mode 100644
index 0000000..e8487ad
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.solver;
+
+import java.io.File;
+import java.util.Random;
+
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.TestDistributedRowMatrix;
+import org.junit.Test;
+
+public final class TestDistributedConjugateGradientSolver extends MahoutTestCase {
+
+  private static Vector randomVector(int size, double entryMean) {
+    DenseVector v = new DenseVector(size);
+    Random r = RandomUtils.getRandom();
+    
+    for (int i = 0; i < size; ++i) {
+      v.setQuick(i, r.nextGaussian() * entryMean);
+    }
+    
+    return v;
+  }
+
+  @Test
+  public void testSolver() throws Exception {
+    File testData = getTestTempDir("testdata");
+    DistributedRowMatrix matrix = new TestDistributedRowMatrix().randomDistributedMatrix(
+        10, 10, 10, 10, 10.0, true, testData.getAbsolutePath());
+    matrix.setConf(getConfiguration());
+    Vector vector = randomVector(matrix.numCols(), 10.0);
+    
+    DistributedConjugateGradientSolver solver = new DistributedConjugateGradientSolver();
+    Vector x = solver.solve(matrix, vector);
+
+    Vector solvedVector = matrix.times(x);    
+    double distance = Math.sqrt(vector.getDistanceSquared(solvedVector));
+    assertEquals(0.0, distance, EPSILON);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java
new file mode 100644
index 0000000..3ac9405
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.solver;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.TestDistributedRowMatrix;
+import org.junit.Test;
+
+public final class TestDistributedConjugateGradientSolverCLI extends MahoutTestCase {
+
+  private static Vector randomVector(int size, double entryMean) {
+    Vector v = new DenseVector(size);
+    Random r = RandomUtils.getRandom();
+    for (int i = 0; i < size; ++i) {
+      v.setQuick(i, r.nextGaussian() * entryMean);
+    }
+    return v;
+  }
+
+  private static Path saveVector(Configuration conf, Path path, Vector v) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class);
+    
+    try {
+      writer.append(new IntWritable(0), new VectorWritable(v));
+    } finally {
+      writer.close();
+    }
+    return path;
+  }
+  
+  private static Vector loadVector(Configuration conf, Path path) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+    Writable key = new IntWritable();
+    VectorWritable value = new VectorWritable();
+    
+    try {
+      if (!reader.next(key, value)) {
+        throw new IOException("Input vector file is empty.");      
+      }
+      return value.get();
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testSolver() throws Exception {
+    Configuration conf = getConfiguration();
+    Path testData = getTestTempDirPath("testdata");
+    DistributedRowMatrix matrix = new TestDistributedRowMatrix().randomDistributedMatrix(
+        10, 10, 10, 10, 10.0, true, testData.toString());
+    matrix.setConf(conf);
+    Path output = getTestTempFilePath("output");
+    Path vectorPath = getTestTempFilePath("vector");
+    Path tempPath = getTestTempDirPath("tmp");
+
+    Vector vector = randomVector(matrix.numCols(), 10.0);
+    saveVector(conf, vectorPath, vector);
+        
+    String[] args = {
+        "-i", matrix.getRowPath().toString(),
+        "-o", output.toString(),
+        "--tempDir", tempPath.toString(),
+        "--vector", vectorPath.toString(),
+        "--numRows", "10",
+        "--numCols", "10",
+        "--symmetric", "true"        
+    };
+    
+    DistributedConjugateGradientSolver solver = new DistributedConjugateGradientSolver();
+    ToolRunner.run(getConfiguration(), solver.job(), args);
+    
+    Vector x = loadVector(conf, output);
+    
+    Vector solvedVector = matrix.times(x);    
+    double distance = Math.sqrt(vector.getDistanceSquared(solvedVector));
+    assertEquals(0.0, distance, EPSILON);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java
new file mode 100644
index 0000000..7e59eb4
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java
@@ -0,0 +1,121 @@
+package org.apache.mahout.math.hadoop.stats;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.jet.random.Normal;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Random;
+
+public final class BasicStatsTest extends MahoutTestCase {
+
+  private Configuration conf;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    conf = getConfiguration();
+  }
+
+  @Test
+  public void testVar() throws Exception {
+    Path input = getTestTempFilePath("stdDev/counts.file");
+    Path output = getTestTempFilePath("stdDev/output.file");
+    
+    produceTestData(input);
+    
+    double v = BasicStats.variance(input, output, conf);
+    assertEquals(2.44, v, 0.01);
+  }
+
+
+  @Test
+  public void testStdDev() throws Exception {
+    Path input = getTestTempFilePath("stdDev/counts.file");
+    Path output = getTestTempFilePath("stdDev/output.file");
+    
+    produceTestData(input);
+    
+    double v = BasicStats.stdDev(input, output, conf);
+    assertEquals(1.56, v, 0.01); //sample std dev is 1.563, std. dev from a discrete set is 1.48
+
+  }
+  
+  @Test
+  public void testStdDevForGivenMean() throws Exception {
+    Path input = getTestTempFilePath("stdDev/counts.file");
+    Path output = getTestTempFilePath("stdDev/output.file");
+    
+    produceTestData(input);
+    
+    double v = BasicStats.stdDevForGivenMean(input, output, 0.0D, conf);
+    assertEquals(10.65, v, 0.01); //sample std dev is 10.65
+
+  }
+  
+  private void produceTestData(Path input) throws Exception {
+	  FileSystem fs = FileSystem.get(input.toUri(), conf);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, input, IntWritable.class, DoubleWritable.class);
+    //Random random = new MersenneTwisterRNG();
+    /*Normal normal = new Normal(5, 3, random);
+    for (int i = 0; i < 10000; i++) {
+      writer.append(new IntWritable(i), new DoubleWritable((long)normal.nextDouble()));
+    }*/
+    int i = 0;
+    writer.append(new IntWritable(i++), new DoubleWritable(7));
+    writer.append(new IntWritable(i++), new DoubleWritable(9));
+    writer.append(new IntWritable(i++), new DoubleWritable(9));
+    writer.append(new IntWritable(i++), new DoubleWritable(10));
+    writer.append(new IntWritable(i++), new DoubleWritable(10));
+    writer.append(new IntWritable(i++), new DoubleWritable(10));
+    writer.append(new IntWritable(i++), new DoubleWritable(10));
+    writer.append(new IntWritable(i++), new DoubleWritable(11));
+    writer.append(new IntWritable(i++), new DoubleWritable(11));
+    writer.append(new IntWritable(i++), new DoubleWritable(13));
+    writer.close();
+  }
+
+  //Not entirely sure on this test
+  @Test
+  public void testStdDev2() throws Exception {
+    Path input = getTestTempFilePath("stdDev/counts.file");
+    Path output = getTestTempFilePath("stdDev/output.file");
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, input, IntWritable.class,
+            DoubleWritable.class);
+    Random random = RandomUtils.getRandom();
+    Normal normal = new Normal(5, 3, random);
+    for (int i = 0; i < 1000000; i++) {
+      writer.append(new IntWritable(i), new DoubleWritable((long) normal.nextInt()));
+    }
+    writer.close();
+    double v = BasicStats.stdDev(input, output, conf);
+    assertEquals(3, v, 0.02);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java
new file mode 100644
index 0000000..6a194dd
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.*;
+import org.apache.mahout.math.function.DoubleFunction;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.VectorFunction;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Random;
+
+public class LocalSSVDPCASparseTest extends MahoutTestCase {
+
+  private static final double s_epsilon = 1.0E-10d;
+
+  @Test
+  public void testOmegaTRightMultiply() {
+    final Random rnd = RandomUtils.getRandom();
+    final long seed = rnd.nextLong();
+    final int n = 2000;
+
+    final int kp = 100;
+
+    final Omega omega = new Omega(seed, kp);
+    final Matrix materializedOmega = new DenseMatrix(n, kp);
+    for (int i = 0; i < n; i++)
+      for (int j = 0; j < kp; j++)
+        materializedOmega.setQuick(i, j, omega.getQuick(i, j));
+    Vector xi = new DenseVector(n);
+    xi.assign(new DoubleFunction() {
+      @Override
+      public double apply(double x) {
+        return rnd.nextDouble() * 100;
+      }
+    });
+
+    Vector s_o = omega.mutlithreadedTRightMultiply(xi);
+
+    Matrix xiVector = new DenseMatrix(n, 1);
+    xiVector.assignColumn(0, xi);
+
+    Vector s_o_control = materializedOmega.transpose().times(xiVector).viewColumn(0);
+
+    assertEquals(0, s_o.minus(s_o_control).aggregate(Functions.PLUS, Functions.ABS), 1e-10);
+
+    System.out.printf("s_omega=\n%s\n", s_o);
+    System.out.printf("s_omega_control=\n%s\n", s_o_control);
+  }
+
+  @Test
+  public void runPCATest1() throws IOException {
+    runSSVDSolver(1);
+  }
+
+//  @Test
+  public void runPCATest0() throws IOException {
+    runSSVDSolver(0);
+  }
+
+
+  public void runSSVDSolver(int q) throws IOException {
+
+    Configuration conf = new Configuration();
+    conf.set("mapred.job.tracker", "local");
+    conf.set("fs.default.name", "file:///");
+
+    // conf.set("mapred.job.tracker","localhost:11011");
+    // conf.set("fs.default.name","hdfs://localhost:11010/");
+
+    Deque<Closeable> closeables = Lists.newLinkedList();
+    try {
+      Random rnd = RandomUtils.getRandom();
+
+      File tmpDir = getTestTempDir("svdtmp");
+      conf.set("hadoop.tmp.dir", tmpDir.getAbsolutePath());
+
+      Path aLocPath = new Path(getTestTempDirPath("svdtmp/A"), "A.seq");
+
+      // create distributed row matrix-like struct
+      SequenceFile.Writer w =
+        SequenceFile.createWriter(FileSystem.getLocal(conf),
+                                  conf,
+                                  aLocPath,
+                                  Text.class,
+                                  VectorWritable.class,
+                                  CompressionType.BLOCK,
+                                  new DefaultCodec());
+      closeables.addFirst(w);
+
+      int n = 100;
+      int m = 2000;
+      double percent = 5;
+
+      VectorWritable vw = new VectorWritable();
+      Text rkey = new Text();
+
+      Vector xi = new DenseVector(n);
+
+      double muAmplitude = 50.0;
+      for (int i = 0; i < m; i++) {
+        Vector dv = new SequentialAccessSparseVector(n);
+        String rowname = "row-"+i;
+        NamedVector namedRow = new NamedVector(dv, rowname);
+        for (int j = 0; j < n * percent / 100; j++) {
+          dv.setQuick(rnd.nextInt(n), muAmplitude * (rnd.nextDouble() - 0.25));
+        }
+        rkey.set("row-i"+i);
+        vw.set(namedRow);
+        w.append(rkey, vw);
+        xi.assign(dv, Functions.PLUS);
+      }
+      closeables.remove(w);
+      Closeables.close(w, false);
+
+      xi.assign(Functions.mult(1.0 / m));
+
+      FileSystem fs = FileSystem.get(conf);
+
+      Path tempDirPath = getTestTempDirPath("svd-proc");
+      Path aPath = new Path(tempDirPath, "A/A.seq");
+      fs.copyFromLocalFile(aLocPath, aPath);
+      Path xiPath = new Path(tempDirPath, "xi/xi.seq");
+      SSVDHelper.saveVector(xi, xiPath, conf);
+
+      Path svdOutPath = new Path(tempDirPath, "SSVD-out");
+
+      // make sure we wipe out previous test results, just a convenience
+      fs.delete(svdOutPath, true);
+
+      // Solver starts here:
+      System.out.println("Input prepared, starting solver...");
+
+      int ablockRows = 867;
+      int p = 60;
+      int k = 40;
+      SSVDSolver ssvd =
+        new SSVDSolver(conf,
+                       new Path[]{aPath},
+                       svdOutPath,
+                       ablockRows,
+                       k,
+                       p,
+                       3);
+      ssvd.setOuterBlockHeight(500);
+      ssvd.setAbtBlockHeight(251);
+      ssvd.setPcaMeanPath(xiPath);
+
+    /*
+     * Removing V,U jobs from this test to reduce running time. i will keep them
+     * put in the dense test though.
+     *
+     * For PCA test, we also want to request U*Sigma output and check it for named
+     * vector propagation.
+     */
+      ssvd.setComputeU(false);
+      ssvd.setComputeV(false);
+      ssvd.setcUSigma(true);
+
+      ssvd.setOverwrite(true);
+      ssvd.setQ(q);
+      ssvd.setBroadcast(true);
+      ssvd.run();
+
+      Vector stochasticSValues = ssvd.getSingularValues();
+
+      // try to run the same thing without stochastic algo
+      Matrix a = SSVDHelper.drmLoadAsDense(fs, aPath, conf);
+
+      verifyInternals(svdOutPath, a, new Omega(ssvd.getOmegaSeed(), k + p), k + p, q);
+
+      // subtract pseudo pca mean
+      for (int i = 0; i < m; i++) {
+        a.viewRow(i).assign(xi, Functions.MINUS);
+      }
+
+      SingularValueDecomposition svd2 =
+        new SingularValueDecomposition(a);
+
+      Vector svalues2 = new DenseVector(svd2.getSingularValues());
+
+      System.out.println("--SSVD solver singular values:");
+      LocalSSVDSolverSparseSequentialTest.dumpSv(stochasticSValues);
+      System.out.println("--SVD solver singular values:");
+      LocalSSVDSolverSparseSequentialTest.dumpSv(svalues2);
+
+      for (int i = 0; i < k + p; i++) {
+        assertTrue(Math.abs(svalues2.getQuick(i) - stochasticSValues.getQuick(i)) <= s_epsilon);
+      }
+
+      DenseMatrix mQ =
+        SSVDHelper.drmLoadAsDense(fs, new Path(svdOutPath, "Bt-job/"
+          + BtJob.OUTPUT_Q + "-*"), conf);
+
+      SSVDCommonTest.assertOrthonormality(mQ,
+                                          false,
+                                          s_epsilon);
+
+      // assert name propagation
+      for (Iterator<Pair<Writable, Vector>> iter = SSVDHelper.drmIterator(fs,
+                                                                          new Path(ssvd.getuSigmaPath()+"/*"),
+                                                                          conf,
+                                                                          closeables); iter.hasNext(); ) {
+        Pair<Writable, Vector> pair = iter.next();
+        Writable key = pair.getFirst();
+        Vector v = pair.getSecond();
+
+        assertTrue(v instanceof NamedVector);
+        assertTrue(key instanceof Text);
+      }
+
+    } finally {
+      IOUtils.close(closeables);
+    }
+  }
+
+  private void verifyInternals(Path tempDir, Matrix a, Omega omega, int kp, int q) {
+    int m = a.numRows();
+    int n = a.numCols();
+
+    Vector xi = a.aggregateColumns(new VectorFunction() {
+      @Override
+      public double apply(Vector v) {
+        return v.zSum() / v.size();
+      }
+    });
+
+    // materialize omega
+    Matrix momega = new DenseMatrix(n, kp);
+    for (int i = 0; i < n; i++)
+      for (int j = 0; j < kp; j++)
+        momega.setQuick(i, j, omega.getQuick(i, j));
+
+    Vector s_o = omega.mutlithreadedTRightMultiply(xi);
+
+    System.out.printf("s_omega=\n%s\n", s_o);
+
+    Matrix y = a.times(momega);
+    for (int i = 0; i < n; i++) y.viewRow(i).assign(s_o, Functions.MINUS);
+
+    QRDecomposition qr = new QRDecomposition(y);
+    Matrix qm = qr.getQ();
+
+    Vector s_q = qm.aggregateColumns(new VectorFunction() {
+      @Override
+      public double apply(Vector v) {
+        return v.zSum();
+      }
+    });
+
+    System.out.printf("s_q=\n%s\n", s_q);
+
+    Matrix b = qm.transpose().times(a);
+
+    Vector s_b = b.times(xi);
+
+    System.out.printf("s_b=\n%s\n", s_b);
+
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
new file mode 100644
index 0000000..784c7a5
--- /dev/null
+++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.junit.Test;
+
+/**
+ * 
+ * Tests SSVD solver with a made-up data running hadoop solver in a local mode.
+ * It requests full-rank SSVD and then compares singular values to that of
+ * Colt's SVD asserting epsilon(precision) 1e-10 or whatever most recent value
+ * configured.
+ * 
+ */
+public class LocalSSVDSolverDenseTest extends MahoutTestCase {
+
+  private static final double s_epsilon = 1.0E-10d;
+
+  /*
+   * I actually never saw errors more than 3% worst case for this particular
+   * test, but since it's non-deterministic test, it still may occasionally
+   * produce bad results with a non-zero probability, so i put this pct% for
+   * error margin high enough so it (almost) never fails.
+   */
+  private static final double s_precisionPct = 10;
+
+  @Test
+  public void testSSVDSolverDense() throws IOException {
+    runSSVDSolver(0);
+  }
+
+  @Test
+  public void testSSVDSolverPowerIterations1() throws IOException {
+    runSSVDSolver(1);
+  }
+
+  // remove from active tests to save time.
+  /* 
+  @Test
+  public void testSSVDSolverPowerIterations2() throws IOException {
+    runSSVDSolver(2);
+  }
+   */
+
+  public void runSSVDSolver(int q) throws IOException {
+
+    Configuration conf = getConfiguration();
+    conf.set("mapred.job.tracker", "local");
+    conf.set("fs.default.name", "file:///");
+
+    // conf.set("mapred.job.tracker","localhost:11011");
+    // conf.set("fs.default.name","hdfs://localhost:11010/");
+
+    File tmpDir = getTestTempDir("svdtmp");
+    conf.set("hadoop.tmp.dir", tmpDir.getAbsolutePath());
+
+    Path aLocPath = new Path(getTestTempDirPath("svdtmp/A"), "A.seq");
+
+    // create distributed row matrix-like struct
+    // SequenceFile.Writer w = SequenceFile.createWriter(
+    // FileSystem.getLocal(conf), conf, aLocPath, IntWritable.class,
+    // VectorWritable.class, CompressionType.NONE, new DefaultCodec());
+    // closeables.addFirst(w);
+
+    // make input equivalent to 2 mln non-zero elements.
+    // With 100mln the precision turns out to be only better (LLN law i guess)
+    // With oversampling of 100, i don't get any error at all.
+    int n = 100;
+    int m = 2000;
+    Vector singularValues =
+      new DenseVector(new double[] { 10, 4, 1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1,
+          0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1,
+          0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1,
+          0.1, 0.1, 0.1, 0.1, 0.1, 0.1 });
+
+    SSVDTestsHelper.generateDenseInput(aLocPath,
+                                       FileSystem.getLocal(conf),
+                                       singularValues,
+                                       m,
+                                       n);
+
+    FileSystem fs = FileSystem.get(aLocPath.toUri(), conf);
+
+    Path tempDirPath = getTestTempDirPath("svd-proc");
+    Path aPath = new Path(tempDirPath, "A/A.seq");
+    fs.copyFromLocalFile(aLocPath, aPath);
+
+    Path svdOutPath = new Path(tempDirPath, "SSVD-out");
+
+    // Solver starts here:
+    System.out.println("Input prepared, starting solver...");
+
+    int ablockRows = 867;
+    int p = 10;
+    int k = 3;
+    SSVDSolver ssvd =
+      new SSVDSolver(conf,
+                     new Path[] { aPath },
+                     svdOutPath,
+                     ablockRows,
+                     k,
+                     p,
+                     3);
+    /*
+     * these are only tiny-test values to simulate high load cases, in reality
+     * one needs much bigger
+     */
+    ssvd.setOuterBlockHeight(500);
+    ssvd.setAbtBlockHeight(400);
+    ssvd.setOverwrite(true);
+    ssvd.setQ(q);
+    ssvd.setBroadcast(false);
+    ssvd.run();
+
+    Vector stochasticSValues = ssvd.getSingularValues();
+    System.out.println("--SSVD solver singular values:");
+    dumpSv(stochasticSValues);
+
+    // the full-rank svd for this test size takes too long to run,
+    // so i comment it out, instead, i will be comparing
+    // result singular values to the original values used
+    // to generate input (which are guaranteed to be right).
+
+    /*
+     * System.out.println("--Colt SVD solver singular values:"); // try to run
+     * 
+     * the same thing without stochastic algo double[][] a =
+     * SSVDSolver.drmLoadAsDense(fs, aPath, conf);
+     * 
+     * 
+     * 
+     * SingularValueDecomposition svd2 = new SingularValueDecomposition(new
+     * DenseMatrix(a));
+     * 
+     * a = null;
+     * 
+     * double[] svalues2 = svd2.getSingularValues(); dumpSv(svalues2);
+     * 
+     * for (int i = 0; i < k ; i++) { Assert .assertTrue(1-Math.abs((svalues2[i]
+     * - stochasticSValues[i])/svalues2[i]) <= s_precisionPct/100); }
+     */
+
+    // assert first k against those
+    // used to generate surrogate input
+
+    for (int i = 0; i < k; i++) {
+      assertTrue(Math.abs((singularValues.getQuick(i) - stochasticSValues.getQuick(i))
+          / singularValues.getQuick(i)) <= s_precisionPct / 100);
+    }
+
+    DenseMatrix mQ =
+      SSVDHelper.drmLoadAsDense(fs, new Path(svdOutPath, "Bt-job/"
+        + BtJob.OUTPUT_Q + "-*"), conf);
+
+    SSVDCommonTest.assertOrthonormality(mQ,
+                                        false,
+                                        s_epsilon);
+
+    DenseMatrix u =
+      SSVDHelper.drmLoadAsDense(fs,
+                                new Path(svdOutPath, "U/*"),
+                                conf);
+    SSVDCommonTest.assertOrthonormality(u, false, s_epsilon);
+
+    DenseMatrix v =
+      SSVDHelper.drmLoadAsDense(fs,
+                                new Path(svdOutPath, "V/*"),
+                                conf);
+    SSVDCommonTest.assertOrthonormality(v, false, s_epsilon);
+  }
+
+  static void dumpSv(Vector s) {
+    System.out.printf("svs: ");
+    for (Vector.Element el : s.all()) {
+      System.out.printf("%f  ", el.get());
+    }
+    System.out.println();
+
+  }
+
+}