You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/07/09 09:09:58 UTC
svn commit: r792423 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/
src/java/org/apache/hama/mapred/ src/test/org/apache/hama/
Author: edwardyoon
Date: Thu Jul 9 07:09:57 2009
New Revision: 792423
URL: http://svn.apache.org/viewvc?rev=792423&view=rev
Log:
Finds the eigenvalues and eigenvectors associated with the symmetric matrix A
Added:
incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java
incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jul 9 07:09:57 2009
@@ -3,7 +3,8 @@
Trunk (unreleased changes)
NEW FEATURES
-
+ HAMA-185: Finds the eigenvalues and eigenvectors
+ associated with the symmetric matrix A (samuel)
HAMA-171: Find the maximum absolute row sum using MapReduce (edwardyoon)
HAMA-174: Compute the transpose of a matrix (edwardyoon)
HAMA-162: Add Graph using Sparse Matrix (edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Thu Jul 9 07:09:57 2009
@@ -51,6 +51,7 @@
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.algebra.JacobiEigenValue;
import org.apache.hama.algebra.MatrixNormMapRed;
import org.apache.hama.algebra.TransposeMap;
import org.apache.hama.algebra.TransposeReduce;
@@ -146,6 +147,16 @@
this.tableDesc.addFamily(new HColumnDescriptor(Bytes
.toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
Integer.MAX_VALUE, HConstants.FOREVER, false));
+ // the following families are used in JacobiEigenValue computation
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(JacobiEigenValue.EI), 1, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(JacobiEigenValue.EICOL), 10, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(JacobiEigenValue.EIVEC), 10, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
LOG.info("Initializing the matrix storage.");
this.admin.createTable(this.tableDesc);
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Thu Jul 9 07:09:57 2009
@@ -33,26 +33,34 @@
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hama.algebra.BlockMultiplyMap;
import org.apache.hama.algebra.BlockMultiplyReduce;
import org.apache.hama.algebra.DenseMatrixVectorMultMap;
import org.apache.hama.algebra.DenseMatrixVectorMultReduce;
+import org.apache.hama.algebra.JacobiEigenValue;
import org.apache.hama.algebra.RowCyclicAdditionMap;
import org.apache.hama.algebra.RowCyclicAdditionReduce;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
import org.apache.hama.io.VectorUpdate;
import org.apache.hama.mapred.CollectBlocksMapper;
+import org.apache.hama.mapred.DummyMapper;
import org.apache.hama.mapred.RandomMatrixMap;
import org.apache.hama.mapred.RandomMatrixReduce;
import org.apache.hama.mapred.VectorInputFormat;
@@ -666,4 +674,243 @@
JobManager.execute(jobConf);
}
+
+ /**
+ * Compute all the eigen values.
+ * Note: all the eigen values are collected in the "eival:value" column,
+ * and the eigen vector of a specified eigen value is collected in the
+ * "eivec:" column family in the same row.
+ *
+ * TODO: we may need to expose the interface to access the eigen values
+ * and vectors
+ *
+ * @param loops limit the loops of the computation
+ * @throws IOException
+ */
+ public void jacobiEigenValue(int loops) throws IOException {
+ JobConf jobConf = new JobConf(config);
+
+ /******************************************************************
+ * Initialization
+ *
+ * A M/R job is used for initialization(such as, preparing a matrx
+ * copy of the original in "eicol:" family.)
+ ******************************************************************/
+ // initialization
+ jobConf.setJobName("JacobiEigen initialization MR job" + getPath());
+
+ jobConf.setMapperClass(JacobiEigenValue.InitMapper.class);
+ jobConf.setInputFormat(VectorInputFormat.class);
+ jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.set(JacobiEigenValue.MATRIX, getPath());
+ jobConf.setOutputFormat(NullOutputFormat.class);
+ jobConf.setMapOutputKeyClass(IntWritable.class);
+ jobConf.setMapOutputValueClass(MapWritable.class);
+
+ JobManager.execute(jobConf);
+
+ final FileSystem fs = FileSystem.get(jobConf);
+ Pair pivotPair = new Pair();
+ DoubleWritable pivotWritable = new DoubleWritable();
+ VectorUpdate vu ;
+
+ // loop
+ int size = this.getRows();
+ int state = size;
+ int pivot_row, pivot_col;
+ double pivot;
+ double s, c, t, y;
+
+ while(state != 0 && loops > 0) {
+ /******************************************************************
+ * Find the pivot and its index(pivot_row, pivot_col)
+ *
+ * A M/R job is used to scan all the "eival:ind" to get the max
+ * absolute value of each row, and do a MAX aggregation of these
+ * max values to get the max value in the matrix.
+ ******************************************************************/
+ jobConf = new JobConf(config);
+ jobConf.setJobName("Find Pivot MR job" + getPath());
+
+ jobConf.setNumReduceTasks(1);
+
+ Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_" + System.currentTimeMillis()), "out");
+ if(fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ jobConf.setMapperClass(JacobiEigenValue.PivotMapper.class);
+ jobConf.setInputFormat(JacobiEigenValue.PivotInputFormat.class);
+ jobConf.set(JacobiEigenValue.PivotInputFormat.COLUMN_LIST, JacobiEigenValue.EIIND);
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.setMapOutputKeyClass(Pair.class);
+ jobConf.setMapOutputValueClass(DoubleWritable.class);
+
+ jobConf.setOutputKeyClass(Pair.class);
+ jobConf.setOutputValueClass(DoubleWritable.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(jobConf, outDir);
+
+ // update the out put dir of the job
+ outDir = FileOutputFormat.getOutputPath(jobConf);
+
+ JobManager.execute(jobConf);
+
+ //read outputs
+ Path inFile = new Path(outDir, "part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ try {
+ reader.next(pivotPair, pivotWritable);
+ pivot_row = pivotPair.getRow();
+ pivot_col = pivotPair.getColumn();
+ pivot = pivotWritable.get();
+ } finally {
+ reader.close();
+ }
+ fs.delete(outDir.getParent(), true);
+
+ /******************************************************************
+ * Calculation
+ *
+ * Compute the rotation parameters of next rotation.
+ ******************************************************************/
+ double e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_row),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+ double e2 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_col),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+
+ y = (e2 - e1) / 2;
+ t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y);
+ s = Math.sqrt(pivot * pivot + t * t);
+ c = t / s;
+ s = pivot / s;
+ t = (pivot * pivot) / t;
+ if(y < 0) {
+ s = -s;
+ t = -t;
+ }
+
+ /******************************************************************
+ * Upate the pivot and the eigen values indexed by the pivot
+ ******************************************************************/
+ vu = new VectorUpdate(pivot_row);
+ vu.put(JacobiEigenValue.EICOL, pivot_col, 0);
+ table.commit(vu.getBatchUpdate());
+
+ state = update(pivot_row, -t, state);
+ state = update(pivot_col, t, state);
+
+ /******************************************************************
+ * Rotation the matrix
+ ******************************************************************/
+ // rotation
+ jobConf = new JobConf(config);
+ jobConf.setJobName("Rotation Matrix MR job" + getPath());
+
+ jobConf.setInt(JacobiEigenValue.PIVOTROW, pivot_row);
+ jobConf.setInt(JacobiEigenValue.PIVOTCOL, pivot_col);
+ jobConf.set(JacobiEigenValue.PIVOTSIN, String.valueOf(s));
+ jobConf.set(JacobiEigenValue.PIVOTCOS, String.valueOf(c));
+
+ jobConf.setMapperClass(DummyMapper.class);
+ jobConf.setInputFormat(JacobiEigenValue.RotationInputFormat.class);
+ jobConf.set(JacobiEigenValue.RotationInputFormat.COLUMN_LIST, JacobiEigenValue.EIIND);
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.setMapOutputKeyClass(NullWritable.class);
+ jobConf.setMapOutputValueClass(NullWritable.class);
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.setOutputFormat(NullOutputFormat.class);
+
+ JobManager.execute(jobConf);
+
+ // rotate eigenvectors
+ LOG.info("rotating eigenvector");
+ for(int i = 0; i < size; i++) {
+ e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_row),
+ Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+ e2 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(pivot_col),
+ Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+
+ vu = new VectorUpdate(pivot_row);
+ vu.put(JacobiEigenValue.EIVEC, i, c * e1 - s * e2);
+ table.commit(vu.getBatchUpdate());
+
+ vu = new VectorUpdate(pivot_col);
+ vu.put(JacobiEigenValue.EIVEC, i, s * e1 + c * e2);
+ table.commit(vu.getBatchUpdate());
+ }
+
+ LOG.info("update index...");
+ // update index array
+ maxind(pivot_row, size);
+ maxind(pivot_col, size);
+
+ loops --;
+ }
+ }
+
+ void maxind(int row, int size) throws IOException {
+ int m = row + 1;
+ if(row + 2 < size) {
+ double max = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EICOL + m)).getValue());
+ double val;
+ for(int i=row + 2; i<size; i++) {
+ val = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EICOL + i)).getValue());
+ if(Math.abs(val) > Math.abs(max)) {
+ m = i;
+ max = val;
+ }
+ }
+ }
+
+ VectorUpdate vu = new VectorUpdate(row);
+ vu.put(JacobiEigenValue.EIIND, m);
+ table.commit(vu.getBatchUpdate());
+ }
+
+ int update(int row, double value, int state) throws IOException {
+ double e = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+ int changed = BytesUtil.bytesToInt(table.get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EICHANGED)).getValue());
+ double y = e;
+ e += value;
+
+ VectorUpdate vu = new VectorUpdate(row);
+ vu.put(JacobiEigenValue.EIVAL, e);
+ if(changed == 1 && (Math.abs(y - e) < .0000001)) { //y == e) {
+ changed = 0;
+ vu.put(JacobiEigenValue.EICHANGED, changed);
+ state --;
+ } else if(changed == 0 && (Math.abs(y - e) > .0000001)) {
+ changed = 1;
+ vu.put(JacobiEigenValue.EICHANGED, changed);
+ state ++;
+ }
+ table.commit(vu.getBatchUpdate());
+ return state;
+ }
+
+ // for test
+ boolean verifyEigenValue(double[] e, double[][] E) throws IOException {
+ boolean success = true;
+ double e1, ev;
+ for(int i=0; i<e.length; i++) {
+ e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+ success &= ((Math.abs(e1 - e[i]) < .0000001));
+ if(!success) return success;
+
+ for(int j=0; j<E[i].length; j++) {
+ ev = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
+ Bytes.toBytes(JacobiEigenValue.EIVEC + j)).getValue());
+ success &= ((Math.abs(ev - E[i][j]) < .0000001));
+ if(!success) return success;
+ }
+ }
+ return success;
+ }
}
Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java?rev=792423&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/JacobiEigenValue.java Thu Jul 9 07:09:57 2009
@@ -0,0 +1,583 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.HTableInputFormatBase;
+import org.apache.hama.mapred.HTableRecordReaderBase;
+import org.apache.hama.util.BytesUtil;
+
+/**
+ * A catalog class collect all the m/r classes to compute the matrix's eigen
+ * values
+ */
+public class JacobiEigenValue {
+
+ /** a matrix copy of the original copy collected in "eicol" family * */
+ public static final String EICOL = "eicol:";
+ /** a column family collect all values and statuses used during computation * */
+ public static final String EI = "eival:";
+ /** a column collect all the eigen values * */
+ public static final String EIVAL = EI + "value";
+ /** a column identify whether the eigen values have been changed * */
+ public static final String EICHANGED = EI + "changed";
+ /** a column identify the index of the max absolute value each row * */
+ public static final String EIIND = EI + "ind";
+ /** a matrix collect all the eigen vectors * */
+ public static final String EIVEC = "eivec:";
+ public static final String MATRIX = "hama.jacobieigenvalue.matrix";
+ /** parameters for pivot * */
+ public static final String PIVOTROW = "hama.jacobi.pivot.row";
+ public static final String PIVOTCOL = "hama.jacobi.pivot.col";
+ public static final String PIVOTSIN = "hama.jacobi.pivot.sin";
+ public static final String PIVOTCOS = "hama.jacobi.pivot.cos";
+
+ static final Log LOG = LogFactory.getLog(JacobiEigenValue.class);
+
+ /**
+ * The matrix will be modified during computing eigen value. So a new matrix
+ * will be created to prevent the original matrix being modified. To reduce
+ * the network transfer, we copy the "column" family in the original matrix to
+ * a "eicol" family. All the following modification will be done over "eicol"
+ * family.
+ *
+ * And the output Eigen Vector Arrays "eivec", and the output eigen value
+ * array "eival:value", and the temp status array "eival:changed", "eival:ind"
+ * will be created.
+ *
+ * Also "eival:state" will record the state of the rotation state of a matrix
+ */
+ public static class InitMapper extends MapReduceBase implements
+ Mapper<IntWritable, MapWritable, NullWritable, NullWritable> {
+
+ HTable table;
+
+ @Override
+ public void configure(JobConf job) {
+ String tableName = job.get(MATRIX, "");
+ try {
+ table = new HTable(new HBaseConfiguration(job), tableName);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
+ throws IOException {
+ int row, col;
+ row = key.get();
+ VectorUpdate vu = new VectorUpdate(row);
+
+ double val;
+ double maxVal = Double.MIN_VALUE;
+ int maxInd = row + 1;
+
+ boolean init = true;
+ for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+ val = ((DoubleEntry) e.getValue()).getValue();
+ col = ((IntWritable) e.getKey()).get();
+ // copy the original matrix to "EICOL" family
+ vu.put(JacobiEigenValue.EICOL, col, val);
+ // make the "EIVEC" a dialog matrix
+ vu.put(JacobiEigenValue.EIVEC, col, col == row ? 1 : 0);
+ if (col == row) {
+ vu.put(JacobiEigenValue.EIVAL, val);
+ }
+ // find the max index
+ if (col > row) {
+ if (init) {
+ maxInd = col;
+ maxVal = val;
+ init = false;
+ } else {
+ if (Math.abs(val) > Math.abs(maxVal)) {
+ maxVal = val;
+ maxInd = col;
+ }
+ }
+ }
+ }
+ // index array
+ vu.put(JacobiEigenValue.EIIND, maxInd);
+ // Changed Array set to be true during initialization
+ vu.put(JacobiEigenValue.EICHANGED, 1);
+
+ table.commit(vu.getBatchUpdate());
+ }
+
+ }
+
+ /**
+ * PivotInputFormat & PivotMapper & PivotReducer are used to find the pivot in
+ * a matrix
+ */
+ public static class PivotInputFormat extends HTableInputFormatBase implements
+ InputFormat<Pair, DoubleWritable>, JobConfigurable {
+
+ private PivotRecordReader tableRecordReader;
+
+ protected static class PivotRecordReader extends HTableRecordReaderBase
+ implements RecordReader<Pair, DoubleWritable> {
+
+ private int totalRows;
+ private int processedRows;
+ private int size;
+ boolean mocked = true;
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ Cell rows = null;
+ rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+ size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+ if (endRow.length == 0) { // the last split, we don't know the end row
+ totalRows = 0; // so we just skip it.
+ } else {
+ if (startRow.length == 0) { // the first split, start row is 0
+ totalRows = BytesUtil.bytesToInt(endRow);
+ } else {
+ totalRows = BytesUtil.bytesToInt(endRow)
+ - BytesUtil.bytesToInt(startRow);
+ }
+ }
+ processedRows = 0;
+ LOG.info("Split (" + Bytes.toString(startRow) + ", "
+ + Bytes.toString(endRow) + ") -> " + totalRows);
+ }
+
+ /**
+ * @return Pair
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public Pair createKey() {
+ return new Pair();
+ }
+
+ /**
+ * @return DoubleWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public DoubleWritable createValue() {
+ return new DoubleWritable();
+ }
+
+ /**
+ * @param key Pair as input key.
+ * @param value DoubleWritable as input value
+ *
+ * Converts Scanner.next() to Pair, DoubleWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(Pair key, DoubleWritable value) throws IOException {
+ RowResult result;
+ try {
+ result = this.scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ }
+
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ byte[] row = result.getRow();
+ int rowId = BytesUtil.bytesToInt(row);
+ if (rowId == size - 1) { // skip the last row
+ if (mocked) {
+ key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ mocked = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ byte[] col = result.get(EIIND).getValue();
+ int colId = BytesUtil.bytesToInt(col);
+ double val = 0;
+
+ // get (rowId, colId)'s value
+ Cell cell = htable.get(BytesUtil.getRowIndex(rowId), Bytes
+ .toBytes(EICOL + colId));
+ if (cell != null && cell.getValue() != null) {
+ val = BytesUtil.bytesToDouble(cell.getValue());
+ }
+
+ key.set(rowId, colId);
+ value.set(val);
+
+ lastRow = row;
+ processedRows++;
+ } else {
+ if (mocked) {
+ key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ mocked = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ return hasMore;
+ }
+
+ @Override
+ public float getProgress() {
+ if (totalRows <= 0) {
+ return 0;
+ } else {
+ return Math.min(1.0f, processedRows / (float) totalRows);
+ }
+ }
+
+ }
+
+ @Override
+ public RecordReader<Pair, DoubleWritable> getRecordReader(InputSplit split,
+ JobConf conf, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ PivotRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new PivotRecordReader();
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ protected void setTableRecordReader(PivotRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ }
+
+ // find the pivot of the matrix
+ public static class PivotMapper extends MapReduceBase implements
+ Mapper<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+ private double max = 0;
+ private Pair pair = new Pair(0, 0);
+ private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ private DoubleWritable dummyVal = new DoubleWritable(0.0);
+
+ @Override
+ public void map(Pair key, DoubleWritable value,
+ OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+ throws IOException {
+ if (key.getRow() != Integer.MAX_VALUE) {
+ if (Math.abs(value.get()) > Math.abs(max)) {
+ pair.set(key.getRow(), key.getColumn());
+ max = value.get();
+ }
+ } else {
+ collector.collect(pair, new DoubleWritable(max));
+ collector.collect(dummyPair, dummyVal);
+ }
+ }
+
+ }
+
+ public static class PivotReducer extends MapReduceBase implements
+ Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+ private double max = 0;
+ private Pair pair = new Pair(0, 0);
+
+ @Override
+ public void reduce(Pair key, Iterator<DoubleWritable> values,
+ OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+ throws IOException {
+ double val;
+ if (key.getRow() != Integer.MAX_VALUE) {
+ val = values.next().get();
+ if (Math.abs(val) > Math.abs(max)) {
+ pair.set(key.getRow(), key.getColumn());
+ max = val;
+ }
+ } else {
+ collector.collect(pair, new DoubleWritable(max));
+ }
+ }
+
+ }
+
+ /**
+ * Tricky here! we rotation the matrix during we scan the matrix and update to
+ * the matrix so we just need a rotationrecordreader to scan the matrix and do
+ * the rotation the mapper&reducer just a dummy mapper
+ */
+ public static class RotationInputFormat extends HTableInputFormatBase
+ implements InputFormat<NullWritable, NullWritable>, JobConfigurable {
+
+ private RotationRecordReader tableRecordReader;
+
+ int pivot_row, pivot_col;
+ double pivot_cos, pivot_sin;
+
+ public void configure(JobConf job) {
+ super.configure(job);
+ pivot_row = job.getInt(PIVOTROW, -1);
+ pivot_col = job.getInt(PIVOTCOL, -1);
+ pivot_sin = Double.parseDouble(job.get(PIVOTSIN));
+ pivot_cos = Double.parseDouble(job.get(PIVOTCOS));
+ }
+
+ protected static class RotationRecordReader extends HTableRecordReaderBase
+ implements RecordReader<NullWritable, NullWritable> {
+
+ private int totalRows;
+ private int processedRows;
+ int startRowId, endRowId = -1;
+ int size;
+
+ int pivotrow, pivotcol;
+ byte[] prow, pcol;
+ double pivotcos, pivotsin;
+
+ public RotationRecordReader(int pr, int pc, double psin, double pcos) {
+ super();
+ pivotrow = pr;
+ pivotcol = pc;
+ pivotsin = psin;
+ pivotcos = pcos;
+ prow = Bytes.toBytes(pivotrow);
+ pcol = Bytes.toBytes(pivotcol);
+ LOG.info(prow);
+ LOG.info(pcol);
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ Cell rows = null;
+ rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+ size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+ if (endRow.length == 0) { // the last split, we don't know the end row
+ totalRows = 0; // so we just skip it.
+ if (startRow.length == 0)
+ startRowId = 0;
+ else
+ startRowId = BytesUtil.bytesToInt(startRow);
+ endRowId = -1;
+ } else {
+ if (startRow.length == 0) { // the first split, start row is 0
+ totalRows = BytesUtil.bytesToInt(endRow);
+ startRowId = 0;
+ endRowId = totalRows;
+ } else {
+ startRowId = BytesUtil.bytesToInt(startRow);
+ endRowId = BytesUtil.bytesToInt(endRow);
+ totalRows = startRowId - endRowId;
+ }
+ }
+ processedRows = 0;
+ LOG
+ .info("Split (" + startRowId + ", " + endRowId + ") -> "
+ + totalRows);
+ }
+
+ /**
+ * @return NullWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ /**
+ * @return NullWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public NullWritable createValue() {
+ return NullWritable.get();
+ }
+
+ /**
+ * @param key NullWritable as input key.
+ * @param value NullWritable as input value
+ *
+ * Converts Scanner.next() to NullWritable, NullWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(NullWritable key, NullWritable value)
+ throws IOException {
+ RowResult result;
+ try {
+ result = this.scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ }
+
+ double s1, s2;
+ VectorUpdate bu;
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ byte[] row = result.getRow();
+ int rowId = BytesUtil.bytesToInt(row);
+ if (rowId < pivotrow) {
+ s1 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(rowId),
+ Bytes.toBytes(JacobiEigenValue.EICOL + pivotrow)).getValue());
+ s2 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(rowId),
+ Bytes.toBytes(JacobiEigenValue.EICOL + pivotcol)).getValue());
+
+ bu = new VectorUpdate(rowId);
+ bu.put(EICOL, pivotrow, pivotcos * s1 - pivotsin * s2);
+ bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+
+ htable.commit(bu.getBatchUpdate());
+ } else if (rowId == pivotrow) {
+ return true;
+ } else if (rowId < pivotcol) {
+ s1 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + rowId))
+ .getValue());
+ s2 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(rowId), Bytes.toBytes(EICOL + pivotcol))
+ .getValue());
+
+ bu = new VectorUpdate(rowId);
+ bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+ htable.commit(bu.getBatchUpdate());
+
+ bu = new VectorUpdate(pivotrow);
+ bu.put(EICOL, rowId, pivotcos * s1 - pivotsin * s2);
+ htable.commit(bu.getBatchUpdate());
+ } else if (rowId == pivotcol) {
+ for (int i = pivotcol + 1; i < size; i++) {
+ s1 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + i))
+ .getValue());
+ s2 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(pivotcol), Bytes.toBytes(EICOL + i))
+ .getValue());
+
+ bu = new VectorUpdate(pivotcol);
+ bu.put(EICOL, i, pivotsin * s1 + pivotcos * s2);
+ htable.commit(bu.getBatchUpdate());
+
+ bu = new VectorUpdate(pivotrow);
+ bu.put(EICOL, i, pivotcos * s1 - pivotsin * s2);
+ htable.commit(bu.getBatchUpdate());
+ }
+ } else { // rowId > pivotcol
+ return false;
+ }
+
+ lastRow = row;
+ processedRows++;
+ }
+ return hasMore;
+ }
+
+ @Override
+ public float getProgress() {
+ if (totalRows <= 0) {
+ return 0;
+ } else {
+ return Math.min(1.0f, processedRows / (float) totalRows);
+ }
+ }
+
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ InputSplit[] splits = super.getSplits(job, numSplits);
+ List<InputSplit> newSplits = new ArrayList<InputSplit>();
+ for (InputSplit split : splits) {
+ TableSplit ts = (TableSplit) split;
+ byte[] row = ts.getStartRow();
+ if (row.length == 0) // the first split
+ newSplits.add(split);
+ else {
+ if (BytesUtil.bytesToInt(ts.getStartRow()) < pivot_col) {
+ newSplits.add(split);
+ }
+ }
+ }
+
+ return newSplits.toArray(new InputSplit[newSplits.size()]);
+ }
+
+ @Override
+ public RecordReader<NullWritable, NullWritable> getRecordReader(
+ InputSplit split, JobConf conf, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ RotationRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin,
+ pivot_cos);
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ protected void setTableRecordReader(RotationRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java?rev=792423&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/Pair.java Thu Jul 9 07:09:57 2009
@@ -0,0 +1,65 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/** A Pair stands for (row, column) pair **/
+public class Pair implements WritableComparable<Pair> {
+
+ int row, col;
+
+ public Pair() {}
+
+ public Pair(int row_, int col_) {
+ set(row_, col_);
+ }
+
+ public int getRow() { return row; }
+ public int getColumn() { return col; }
+
+ public void setRow(int row_) { row = row_; }
+ public void setColumn(int col_) { col = col_; }
+ public void set(int row_, int col_) {
+ row = row_;
+ col = col_;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ row = in.readInt();
+ col = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(row);
+ out.writeInt(col);
+ }
+
+ @Override
+ public int compareTo(Pair p) {
+ return row == p.row ? col - p.col : row - p.row;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ Pair pair = (Pair)obj;
+ return compareTo(pair) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return row;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append('(').append(row).append(',').append(col).append(')');
+ return sb.toString();
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java Thu Jul 9 07:09:57 2009
@@ -50,6 +50,20 @@
this.batchUpdate.put(BytesUtil.getColumnIndex(j), BytesUtil
.doubleToBytes(value));
}
+
+ /**
+ * Put the value in "cfName+j"
+ * @param cfName
+ * @param j
+ * @param value
+ */
+ public void put(String cfName, int j, double value) {
+ this.batchUpdate.put(Bytes.toBytes(cfName + j), Bytes.toBytes(value));
+ }
+
+ public void put(String name, double value) {
+ this.batchUpdate.put(Bytes.toBytes(name), Bytes.toBytes(value));
+ }
public void put(int j, String name) {
this.batchUpdate.put(Bytes.toBytes((Constants.ATTRIBUTE + j)), Bytes
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java?rev=792423&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/DummyMapper.java Thu Jul 9 07:09:57 2009
@@ -0,0 +1,20 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/** Implements the dummy function, mapping inputs directly to outputs. */
+public class DummyMapper<K, V>
+ extends MapReduceBase implements Mapper<K, V, K, V> {
+
+ /** The dummy function. */
+ public void map(K key, V val,
+ OutputCollector<K, V> output, Reporter reporter)
+ throws IOException {
+ // do nothing
+ }
+}
\ No newline at end of file
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=792423&r1=792422&r2=792423&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Thu Jul 9 07:09:57 2009
@@ -43,7 +43,7 @@
private static Matrix m1;
private static Matrix m2;
private static Matrix m3;
- private static Matrix m4;
+ private static Matrix m4, m5;
private final static String aliase1 = "matrix_aliase_A";
private final static String aliase2 = "matrix_aliase_B";
private static HamaConfiguration conf;
@@ -64,6 +64,7 @@
m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
m3 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
m4 = DenseMatrix.random(hCluster.getConf(), SIZE-2, SIZE-2);
+ m5 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
}
protected void tearDown() {
@@ -309,7 +310,106 @@
i++;
}
}
-
+
+ public void testJacobiEigenValue() throws IOException {
+ // copy Matrix m5 to the array
+ double[][] S = new double[SIZE][SIZE];
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ S[i][j] = m5.get(i, j);
+ }
+ }
+
+ // do m/r jacobi eigen value computation
+ DenseMatrix dm = (DenseMatrix)m5;
+ dm.jacobiEigenValue(3);
+
+ // do jacobi egien value over S array
+ int i, j, k, l, m, state;
+ double s, c, t, p, y;
+ double e1, e2;
+ // index array
+ int[] ind = new int[SIZE];
+ boolean[] changed = new boolean[SIZE];
+
+ // output
+ double[] e = new double[SIZE];
+ double[][] E = new double[SIZE][SIZE];
+
+ // init e & E; ind & changed
+ for(i=0; i<SIZE; i++) {
+ for(j=0; j<SIZE; j++) {
+ E[i][j] = 0;
+ }
+ E[i][i] = 1;
+ }
+
+ state = SIZE;
+
+ for(i=0; i<SIZE; i++) {
+ ind[i] = maxind(S, i, SIZE);
+ e[i] = S[i][i];
+ changed[i] = true;
+ }
+
+ int loops = 3;
+ // next rotation
+ while(state != 0 && loops > 0) {
+ // find index(k, l) for pivot p
+ m = 0;
+ for(k = 1; k <= SIZE-2; k++) {
+ if(Math.abs(S[m][ind[m]]) < Math.abs(S[k][ind[k]])) {
+ m = k;
+ }
+ }
+
+ k = m; l = ind[m]; p = S[k][l];
+
+ // calculate c = cos, s = sin
+ y = (e[l] - e[k]) / 2;
+ t = Math.abs(y) + Math.sqrt(p * p + y * y);
+ s = Math.sqrt(p * p + t * t);
+ c = t / s;
+ s = p / s;
+ t = (p * p) / t;
+ if(y < 0) {
+ s = -s;
+ t = -t;
+ }
+
+ S[k][l] = 0.0;
+ state = update(e, changed, k, -t, state);
+ state = update(e, changed, l, t, state);
+
+ for(i = 0; i <= k-1; i++)
+ rotate(S, i, k, i, l, c, s);
+
+ for(i = l+1; i < SIZE; i++)
+ rotate(S, k, i, l, i, c, s);
+
+ for(i = k+1; i <= l-1; i++)
+ rotate(S, k, i, i, l, c, s);
+
+ // rotate eigenvectors
+ for(i = 0; i < SIZE; i++) {
+ e1 = E[k][i];
+ e2 = E[l][i];
+
+ E[k][i] = c * e1 - s * e2;
+ E[l][i] = s * e1 + c * e2;
+ }
+
+ ind[k] = maxind(S, k, SIZE);
+ ind[l] = maxind(S, l, SIZE);
+
+ loops --;
+ }
+
+ // verify the results
+ assertTrue(dm.verifyEigenValue(e, E));
+ }
+
public void testEnsureForAddition() {
try {
m1.add(m4);
@@ -432,10 +532,41 @@
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
- double gap = (c[i][j] - result.get(i, j));
- assertTrue(gap < 0.000001 && gap > -0.000001);
+ assertTrue((Math.abs(c[i][j] - result.get(i, j)) < .0000001));
}
}
}
+
+
+ //index of largest off-diagonal element in row k
+ int maxind(double[][] S, int row, int size) {
+ int m = row + 1;
+ for(int i=row + 2; i<size; i++) {
+ if(Math.abs(S[row][i]) > Math.abs(S[row][m]))
+ m = i;
+ }
+ return m;
+ }
+
+ int update(double[] e, boolean[] changed, int row, double value, int state) {
+ double y = e[row];
+ e[row] += value;
+
+ if(changed[row] && y == e[row]) {
+ changed[row] = false;
+ return state - 1;
+ } else if(!changed[row] && y != e[row]) {
+ changed[row] = true;
+ return state + 1;
+ } else
+ return state;
+ }
+
+ void rotate(double[][] S, int k, int l, int i, int j, double c, double s) {
+ double s1 = S[k][l], s2 = S[i][j];
+ S[k][l] = c * s1 - s * s2;
+ S[i][j] = s * s1 + c * s2;
+ }
+
}