You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mahout.apache.org by Shannon Quinn <sq...@gatech.edu> on 2010/06/09 07:31:30 UTC

CLI input formats and calling other jobs

Hi all,

I have a few more questions regarding the inner workings of Mahout's command
line and data types. I apologize in advance for the naivete...and the length
:P

I'm working on the spectral clustering algorithm, and inherent to that
strategy is the fact that the algorithm works on pairwise data point
comparisons, not the raw data itself. Put another way, if I want to
implement a clustering algorithm for Photoshop CS6's magic wand, the pixel
intensities constitute the raw data, but what my algorithm actually works on
is a pairwise comparison of all the pixels (often a normalized form of
Euclidean distance with some KNN sprinkled in).

This preprocessing step - where n-dimensional data is converted to
matrix-form pairwise comparisons, or "similarities" - there are a lot of
options for how to implement this. My current strategy is for the algorithm
to accept data that has already been processed into similarity matrix form
and thereby leave the preprocessing to the user, but also to provide an
example using images and a script that converts the raw data to similarity
scores. Would this be a good setup? Any other suggestions? (I'd thought of
somehow adding a plugin architecture to allow for different implementations
of performing this similarity preprocessing, but I'm not sure where that
would mesh with the overall Mahout architecture, or that there would even be
time this summer for it)

Also, input data types: I've been using LDA and k-means as templates for my
work, and they make heavy use of SequenceFiles as their input format.
However, I'm unsure of how to further manipulate the data after using
seq2sparse on a CSV input file representing an NxN symmetric similarity
matrix, since k-means and LDA operate on text in all the available examples.

My other questions revolve around clarifications on the codebase. Right now
I'm implementing a canonical k-means spectral clustering algorithm, which if
you want greater detail on the theory you can read the post on my blog (
http://spectrallyclustered.wordpress.com/2010/06/05/sprint-1-k-means-spectral-clustering/),
but the basic steps I'll reproduce here:

1) Convert raw data to similarity, or "affinity", scores -> matrix A (done
internally)
2) Build diagonal matrix D of degree vertices (basically sum of rows of A)
Is there a more efficient way to do this step than the following:

Matrix D = new SparseMatrix(cardinality);
for (int i = 0; i < A.numRows(); i++) {
D.set(i, i, A.getRow(i).zSum());
}

3) Construct a "normalized" Laplacian via the formula: L = D^(-1/2)AD^(-1/2)
I have this:

Matrix L = D.times(A.times(D));

Since D is a diagonal matrix, the exponent operation is simply raising each
individual matrix element to the -0.5 power; I assume this involves
implementing a new UnaryFunction? Or does a method already exist (aside from
manually looping over the matrix diagonal)?

4) Perform eigen-decomposition on L.
This is where DistributedLanczosSolver comes in, but I'm not sure how to
fire off the job from within another job since the run() method requires
command-line parameters.

5) Perform k-means clustering on rows of matrix U consisting of column
eigenvectors of L.
Same problem as #4 - how to invoke the existing k-means job from within my
own job.

I know a lot of this is inexperience, so again I apologize, but I do
appreciate your patience :)

Regards,
Shannon

Re: CLI input formats and calling other jobs

Posted by Jeff Eastman <jd...@windwardsolutions.com>.
Hi Shannon,

I'll take a crack at some of your questions, see below.

On 6/8/10 10:31 PM, Shannon Quinn wrote:
> Hi all,
>
> I have a few more questions regarding the inner workings of Mahout's command
> line and data types. I apologize in advance for the naivete...and the length
> :P
>
> I'm working on the spectral clustering algorithm, and inherent to that
> strategy is the fact that the algorithm works on pairwise data point
> comparisons, not the raw data itself. Put another way, if I want to
> implement a clustering algorithm for Photoshop CS6's magic wand, the pixel
> intensities constitute the raw data, but what my algorithm actually works on
> is a pairwise comparison of all the pixels (often a normalized form of
> Euclidean distance with some KNN sprinkled in).
>
> This preprocessing step - where n-dimensional data is converted to
> matrix-form pairwise comparisons, or "similarities" - there are a lot of
> options for how to implement this. My current strategy is for the algorithm
> to accept data that has already been processed into similarity matrix form
> and thereby leave the preprocessing to the user, but also to provide an
> example using images and a script that converts the raw data to similarity
> scores. Would this be a good setup? Any other suggestions? (I'd thought of
> somehow adding a plugin architecture to allow for different implementations
> of performing this similarity preprocessing, but I'm not sure where that
> would mesh with the overall Mahout architecture, or that there would even be
> time this summer for it)
>
> Also, input data types: I've been using LDA and k-means as templates for my
> work, and they make heavy use of SequenceFiles as their input format.
> However, I'm unsure of how to further manipulate the data after using
> seq2sparse on a CSV input file representing an NxN symmetric similarity
> matrix, since k-means and LDA operate on text in all the available examples.
>    
k-Means and LDA really operate on pure vectors. It is true that many of 
our examples produce these vectors by analyzing text documents but 
neither algorithm requires textual data. I suggest you look at the 
examples/s/m/j/o/a/m/clustering/syntheticcontrol/kmeans/Job. This job 
invokes a preprocessor (InputDriver.runJob()) on a CSV file to convert 
the file into Mahout Vector sequence files suitable for input to the 
clustering jobs. It then invokes CanopyDriver.runJob() on the vector 
sequence files to determine the initial k clusters for input to k-Means. 
Finally the KMeansDriver.runJob() operation completes the clustering. 
You can use this sort of job chaining in your own driver to orchestrate 
your various processing steps.
> My other questions revolve around clarifications on the codebase. Right now
> I'm implementing a canonical k-means spectral clustering algorithm, which if
> you want greater detail on the theory you can read the post on my blog (
> http://spectrallyclustered.wordpress.com/2010/06/05/sprint-1-k-means-spectral-clustering/),
> but the basic steps I'll reproduce here:
>
> 1) Convert raw data to similarity, or "affinity", scores ->  matrix A (done
> internally)
> 2) Build diagonal matrix D of degree vertices (basically sum of rows of A)
> Is there a more efficient way to do this step than the following:
>
> Matrix D = new SparseMatrix(cardinality);
> for (int i = 0; i<  A.numRows(); i++) {
> D.set(i, i, A.getRow(i).zSum());
> }
>    
This looks pretty reasonable to me.
> 3) Construct a "normalized" Laplacian via the formula: L = D^(-1/2)AD^(-1/2)
> I have this:
>
> Matrix L = D.times(A.times(D));
>
> Since D is a diagonal matrix, the exponent operation is simply raising each
> individual matrix element to the -0.5 power; I assume this involves
> implementing a new UnaryFunction? Or does a method already exist (aside from
> manually looping over the matrix diagonal)?
>    
You could use a UnaryFunction and trust the Matrix code to iterate over 
the diagonals efficiently using sparse vectors (it might do well, you 
could test this) but it is a pretty simple loop to write too.
> 4) Perform eigen-decomposition on L.
> This is where DistributedLanczosSolver comes in, but I'm not sure how to
> fire off the job from within another job since the run() method requires
> command-line parameters.
>
>    
The DistributedLanczosSolver.run method does require command line 
parameters and it is factored a bit differently than the clustering jobs 
in this regard. It is pretty simple to factor out a runJob method that 
can be called with pure Java arguments. I don't know why this could not 
be committed if it helps you use it, but I'll leave that decision to 
somebody who is actively in that code.

   public void runJob(Configuration originalConfig, String 
inputPathString, String outputTmpPathString, int numRows, int numCols,
       boolean isSymmetric, int desiredRank, Matrix eigenVectors, 
List<Double> eigenValues, String outputEigenVectorPath)
       throws IOException {
     DistributedRowMatrix matrix = new DistributedRowMatrix(inputPathString,
                                                            
outputTmpPathString,
                                                            numRows,
                                                            numCols);
     matrix.configure(new JobConf(originalConfig));
     solve(matrix, desiredRank, eigenVectors, eigenValues, isSymmetric);
     serializeOutput(eigenVectors, eigenValues, outputEigenVectorPath);
   }

> 5) Perform k-means clustering on rows of matrix U consisting of column
> eigenvectors of L.
> Same problem as #4 - how to invoke the existing k-means job from within my
> own job.
>    
Just invoke the runJob method as in syntheticcontrol, above. Also the 
respective unit tests for the clustering algorithms all have 
programmatic invocation examples.
> I know a lot of this is inexperience, so again I apologize, but I do
> appreciate your patience :)
>
> Regards,
> Shannon
>
>    


Re: CLI input formats and calling other jobs

Posted by Isabel Drost <is...@apache.org>.
On Fri Shannon Quinn <sq...@gatech.edu> wrote:
> 3) I noticed JobConf has been deprecated by Hadoop 0.20.2, but it's
> still used by DistributedRowMatrix. I've been seeing all the tickets
> about upgrading to the current Hadoop APIs, so I assume this is on
> the to-do list. I'd be happy to help whomever is working on this
> particular item, or start working on it if there isn't one.

The work on DistributedRowMatrix has mostly been done by Jake. However
with the discussions on upgrading to the new APIs and with several jobs
already on the new API I think your input here would be most welcome.
Sean, I guess you have another pair of helping hands with your
upgrading effort.

Isabel

Re: CLI input formats and calling other jobs

Posted by Shannon Quinn <sq...@gatech.edu>.
> Please don't call it V.  That is normally the name of the other matrix of
> singular vectors in SVD.  Calling a row normalized version of U by that
> name
> would be terminally confusing.


Excellent point. I'll call it W.

Some other questions:

1) In converting my code from using Matrix objects (Dense, Sparse) to
DistributedRowMatrix's, I've run into the problem of not being able to
perform some of the basic Matrix operations, such as zSum(), or to raise
elements to a power via UnaryFunctions, etc. I could certainly create
Map/Reduce jobs to do these tasks, but is this functionality that could be
included in DistributedRowMatrix itself?

2) For debugging purposes (since I'm using data sets small enough to be held
in memory), I set up a loop over my DistributedRowMatrix (after initializing
it and calling .configure() ):

for (MatrixSlice m : A) {
System.out.println(m.vector().zSum());
}

However, I received an exception on the line:

Exception in thread "main" java.lang.IllegalStateException:
java.io.IOException: wrong value class:
org.apache.mahout.math.VectorWritable@6f649b44 is not class
org.apache.hadoop.io.Text

My raw data resides as a CSV file, on which I've run seqdirectory, and I'm
passing the path to the SequenceFiles to the DistributedRowMatrix
constructor. I've looked over the syntheticcontrol example and the "Creating
Vectors from Text" wiki page and am wondering if I'm missing something very
simple. In the syntheticcontrol example, should I simply be performing a job
like it does, converting the SequenceFile's from one format to another, and
then passing those to the DistributedRowMatrix constructor? Or is it
something else?

3) I noticed JobConf has been deprecated by Hadoop 0.20.2, but it's still
used by DistributedRowMatrix. I've been seeing all the tickets about
upgrading to the current Hadoop APIs, so I assume this is on the to-do list.
I'd be happy to help whomever is working on this particular item, or start
working on it if there isn't one.

Thank you again for you help!

Regards,
Shannon

Re: CLI input formats and calling other jobs

Posted by Ted Dunning <te...@gmail.com>.
Please don't call it V.  That is normally the name of the other matrix of
singular vectors in SVD.  Calling a row normalized version of U by that name
would be terminally confusing.

On Wed, Jun 9, 2010 at 7:14 PM, Shannon Quinn <sq...@gatech.edu> wrote:

> I purposely left this out since I figured it wasn't relevant to the
> immediate questions I had, but you're absolutely correct that U needs to be
> normalized (call it V).
>

Re: CLI input formats and calling other jobs

Posted by Shannon Quinn <sq...@gatech.edu>.
Hi everyone,

Thanks so much for the feedback!

I think in the long run, having the ability to take either symmetric input
> (assume that it is already the matrix of similarities), or the "raw" input,
> would be nice.  For now, whatever is easiest for you to work with should be
> fine.


Theoretically, it is actually feasible to assume non-symmetric
affinity/similarity matrices, though this equates to a non-trivial
stochastic decomposition...so for the time being, I'm assuming symmetric :P

Sticking with SequenceFile is the way to go, with the Writable type dicated
> by what it is you're doing - when they're vectors, use VectorWritable, and
> when you just need to pass around some coefficients, you can send
> IntWritable or your own custom writable.


IntWritable, DoubleWritable, and VectorWritable should serve beautifully.
Just so I'm absolutely clear, though: the <IntWritable> is a row index to
the row of <VectorWritable>, correct?


> So your matrix A is waaaaay too big to just fit in memory, right?  So
> this code won't simply work on Big (or even Medium) Data.
>

That's an excellent point; these similarity matrices can be millions x
millions.


>
> You need to write a MapReduce job which takes your
> SequenceFile<IntWritable,VectorWritable> input, and does what
> your inner loop effectively does.  You can probably have a single
> Reducer which takes in all of the outputs of your Map job and build
> up a big Vector of results - you don't need a SparseMatrix, because
> it's diagonal, it can be represented as a (Dense) Vector.
>

EigencutsMapper and EigencutsReducer it is, then. Thanks! Also, good point
on the diagonal matrix being represented as a DenseVector.

Again, since A is a DistributedRowMatrix, there is a better way
> to compute this: if you look at what happens when you pre-multiply
> a matrix A by a diagonal matrix D, you're taking the column i of
> matrix A and multiplying it by d_ii.  When you post-multiply A
> by D, you're taking the j'th *row* of A and multiplying by d_j.
>
> End result: L_ij = d_i a_ij d_j
>

This is very nice; thanks for the reminder.


> The meat of the algorithm is just the "solve" method.  So if you're
> already set up with a (configure()'ed!) DistributedRowMatrix A which
> you want to decompose, then create a Matrix to store the
> eigenvectors, and create a List<Double> to store the eigenvalues,
> and do like Jeff said - just call:
>
>  solver.solve(A, desiredRank, eVectors, eValues, true);
>
> Just make sure that you've called configure() on A before doing this.
>

It's mainly the configuration objects that were throwing me for a loop, but
I think I've got that figured out now. Thanks!


> The rows of U will be your projections of your original similarity matrix
> onto the reduce dimensional space (they'll be Dense!), so yeah, this makes
> sense, but I'm not sure whether you want to normalize by the inverse
> of the eigenvalues or not, first (do you want U, or S^-1 * U? - by
> definition of S, the dispersion of points in the reduced space are going
> to be highly clustered along the first few eigenvector directions if
> you don't normalize...)
>

I purposely left this out since I figured it wasn't relevant to the
immediate questions I had, but you're absolutely correct that U needs to be
normalized (call it V). In the thesis, the rows of U are normalized to be
unit length (v_ij = u_ij / sqrt(sum_j(u_ij^2))), so that's roughly
equivalent to what you mentioned of S^-1 * U in terms of uncoupling the
points. Either approach will probably work.


> One note to add to Jeff's comment: your eigenvectors will live as the
> transpose of what you want for clustering, so you will need to instantiate
> a DistributedRowMatrix based on them (or, if they are small enough,
> just load the contents of the HDFS file into memory), and then call
> transpose().  The results of this are the thing you want to push into
> the kmeans job as input.
>

I was wondering about the matrix row/column orientation in terms of what
kmeans operated on. Thanks!


>
> Hope this helps more than confuses!
>
>
Very much helps. Thank you to you and Jeff, I really appreciate it!

Shannon

Re: CLI input formats and calling other jobs

Posted by Jake Mannix <ja...@gmail.com>.
Hi again Shannon,

On Tue, Jun 8, 2010 at 10:31 PM, Shannon Quinn <sq...@gatech.edu> wrote:
>
>
> I'm working on the spectral clustering algorithm, and inherent to that
> strategy is the fact that the algorithm works on pairwise data point
> comparisons, not the raw data itself. Put another way, if I want to
> implement a clustering algorithm for Photoshop CS6's magic wand, the pixel
> intensities constitute the raw data, but what my algorithm actually works
> on
> is a pairwise comparison of all the pixels (often a normalized form of
> Euclidean distance with some KNN sprinkled in).
>

Yeah, the fact that you are really looking at a graph (aka matrix) in
a more Markovian form makes this interesting, and I wonder whether
there's something creative we can do other than just computing all the
similarities.  But for now, that makes sense as a starting point.


> This preprocessing step - where n-dimensional data is converted to
> matrix-form pairwise comparisons, or "similarities" - there are a lot of
> options for how to implement this. My current strategy is for the algorithm
> to accept data that has already been processed into similarity matrix form
> and thereby leave the preprocessing to the user, but also to provide an
> example using images and a script that converts the raw data to similarity
> scores. Would this be a good setup? Any other suggestions? (I'd thought of
> somehow adding a plugin architecture to allow for different implementations
> of performing this similarity preprocessing, but I'm not sure where that
> would mesh with the overall Mahout architecture, or that there would even
> be
> time this summer for it)
>

I think in the long run, having the ability to take either symmetric input
(assume that it is already the matrix of similarities), or the "raw" input,
would be nice.  For now, whatever is easiest for you to work with should be
fine.


> Also, input data types: I've been using LDA and k-means as templates for my
> work, and they make heavy use of SequenceFiles as their input format.
> However, I'm unsure of how to further manipulate the data after using
> seq2sparse on a CSV input file representing an NxN symmetric similarity
> matrix, since k-means and LDA operate on text in all the available
> examples.
>

Sticking with SequenceFile is the way to go, with the Writable type dicated
by what it is you're doing - when they're vectors, use VectorWritable, and
when you just need to pass around some coefficients, you can send
IntWritable or your own custom writable.


> My other questions revolve around clarifications on the codebase. Right now
> I'm implementing a canonical k-means spectral clustering algorithm, which
> if
> you want greater detail on the theory you can read the post on my blog (
>
> http://spectrallyclustered.wordpress.com/2010/06/05/sprint-1-k-means-spectral-clustering/
> ),
> but the basic steps I'll reproduce here:
>
> 1) Convert raw data to similarity, or "affinity", scores -> matrix A (done
> internally)
> 2) Build diagonal matrix D of degree vertices (basically sum of rows of A)
> Is there a more efficient way to do this step than the following:
>
> Matrix D = new SparseMatrix(cardinality);
> for (int i = 0; i < A.numRows(); i++) {
> D.set(i, i, A.getRow(i).zSum());
> }
>

So your matrix A is waaaaay too big to just fit in memory, right?  So
this code won't simply work on Big (or even Medium) Data.

You need to write a MapReduce job which takes your
SequenceFile<IntWritable,VectorWritable> input, and does what
your inner loop effectively does.  You can probably have a single
Reducer which takes in all of the outputs of your Map job and build
up a big Vector of results - you don't need a SparseMatrix, because
it's diagonal, it can be represented as a (Dense) Vector.


> 3) Construct a "normalized" Laplacian via the formula:

L = D^(-1/2)AD^(-1/2)
> I have this:
>
> Matrix L = D.times(A.times(D));
>

Again, since A is a DistributedRowMatrix, there is a better way
to compute this: if you look at what happens when you pre-multiply
a matrix A by a diagonal matrix D, you're taking the column i of
matrix A and multiplying it by d_ii.  When you post-multiply A
by D, you're taking the j'th *row* of A and multiplying by d_j.

End result: L_ij = d_i a_ij d_j

Where in your case, d_i = 1/sqrt(D_ii).

Since D is just a single DenseVector, in the way we were
representing it above, it can be held in memory in all of the
Mappers, and so a single MapReduce job can compute the
matrix L from A and D.


> Since D is a diagonal matrix, the exponent operation is simply raising each
> individual matrix element to the -0.5 power; I assume this involves
> implementing a new UnaryFunction? Or does a method already exist (aside
> from
> manually looping over the matrix diagonal)?
>

Look at Functions.java for all of your UnaryFunction needs:

  import static org.apache.mahout.math.function.Functions.*;
  Vector d_inv_sqrt = d.assign(chain(sqrt, inv));

will do 1/sqrt() of all elements in a vector.


> 4) Perform eigen-decomposition on L.
> This is where DistributedLanczosSolver comes in, but I'm not sure how to
> fire off the job from within another job since the run() method requires
> command-line parameters.
>

Jeff is right - this should be refactored so that you can call it from
inside
of java.  It can, really, but you need to do some setup:

The meat of the algorithm is just the "solve" method.  So if you're
already set up with a (configure()'ed!) DistributedRowMatrix A which
you want to decompose, then create a Matrix to store the
eigenvectors, and create a List<Double> to store the eigenvalues,
and do like Jeff said - just call:

  solver.solve(A, desiredRank, eVectors, eValues, true);

Just make sure that you've called configure() on A before doing this.



> 5) Perform k-means clustering on rows of matrix U consisting of column
> eigenvectors of L.
>

The rows of U will be your projections of your original similarity matrix
onto the reduce dimensional space (they'll be Dense!), so yeah, this makes
sense, but I'm not sure whether you want to normalize by the inverse
of the eigenvalues or not, first (do you want U, or S^-1 * U? - by
definition of S, the dispersion of points in the reduced space are going
to be highly clustered along the first few eigenvector directions if
you don't normalize...)



> Same problem as #4 - how to invoke the existing k-means job from within my
> own job.
>

One note to add to Jeff's comment: your eigenvectors will live as the
transpose of what you want for clustering, so you will need to instantiate
a DistributedRowMatrix based on them (or, if they are small enough,
just load the contents of the HDFS file into memory), and then call
transpose().  The results of this are the thing you want to push into
the kmeans job as input.

Hope this helps more than confuses!

  -jake