You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/10/29 07:13:52 UTC
svn commit: r830849 - in /incubator/hama/trunk/src: java/org/apache/hama/
java/org/apache/hama/graph/ java/org/apache/hama/io/
java/org/apache/hama/mapred/ java/org/apache/hama/mapreduce/
java/org/apache/hama/matrix/ java/org/apache/hama/matrix/algebra...
Author: edwardyoon
Date: Thu Oct 29 06:13:51 2009
New Revision: 830849
URL: http://svn.apache.org/viewvc?rev=830849&view=rev
Log:
Replacement of JacobiEigenValueMap/Reduce
Added:
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/package.html
- copied unchanged from r829698, incubator/hama/trunk/src/java/org/apache/hama/mapred/package.html
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/mapred/
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
Modified:
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java
incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java
incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Thu Oct 29 06:13:51 2009
@@ -58,9 +58,6 @@
public final static String ALIASEFAMILY = "aliase";
/** Default columnFamily name */
- @Deprecated
- public final static String COLUMN = "column:";
-
public static byte[] COLUMNFAMILY = Bytes.toBytes("column");
/** Temporary random matrices name prefix */
@@ -85,10 +82,31 @@
public static final int DEFAULT_TRY_TIMES = 10000000;
/** block data column */
- @Deprecated
- public static final String BLOCK = "block:";
-
- public static final String BLOCK_FAMILY = "block";
+ public static final String BLOCK = "block";
public static final Text ROWCOUNT= new Text("row");
+
+ /**
+ * EigenValue Constants
+ */
+ /** a matrix copy of the original copy collected in "eicol" family * */
+ public static final String EICOL = "eicol";
+
+ /** a column family collect all values and statuses used during computation * */
+ public static final String EI = "eival";
+ public static final String EIVAL = "value";
+ public static final String EICHANGED = "changed";
+
+ /** a column identify the index of the max absolute value each row * */
+ public static final String EIIND = "ind";
+
+ /** a matrix collect all the eigen vectors * */
+ public static final String EIVEC = "eivec";
+ public static final String MATRIX = "hama.jacobieigenvalue.matrix";
+
+ /** parameters for pivot * */
+ public static final String PIVOTROW = "hama.jacobi.pivot.row";
+ public static final String PIVOTCOL = "hama.jacobi.pivot.col";
+ public static final String PIVOTSIN = "hama.jacobi.pivot.sin";
+ public static final String PIVOTCOS = "hama.jacobi.pivot.cos";
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java Thu Oct 29 06:13:51 2009
@@ -121,7 +121,7 @@
StringBuilder result = new StringBuilder();
try {
- scan = table.getScanner(new String[] { Constants.COLUMN }, "");
+ scan = table.getScanner(new String[] { "column:" }, "");
Iterator<RowResult> it = scan.iterator();
while (it.hasNext()) {
RowResult rs = it.next();
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java Thu Oct 29 06:13:51 2009
@@ -24,7 +24,6 @@
import java.util.Map.Entry;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
@@ -33,58 +32,29 @@
import org.apache.hama.util.BytesUtil;
public class VectorUpdate {
- private BatchUpdate batchUpdate;
private Put put;
public VectorUpdate(int i) {
- this.batchUpdate = new BatchUpdate(BytesUtil.getRowIndex(i));
this.put = new Put(BytesUtil.getRowIndex(i));
}
public VectorUpdate(String row) {
- this.batchUpdate = new BatchUpdate(row);
this.put = new Put(Bytes.toBytes(row));
}
public VectorUpdate(byte[] row) {
- this.batchUpdate = new BatchUpdate(row);
this.put = new Put(row);
}
public void put(int j, double value) {
- this.batchUpdate.put(BytesUtil.getColumnIndex(j), BytesUtil
- .doubleToBytes(value));
this.put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String.valueOf(j)),
BytesUtil.doubleToBytes(value));
}
- /**
- * Put the value in "cfName+j"
- *
- * @param cfName
- * @param j
- * @param value
- */
public void put(String cfName, int j, double value) {
- this.batchUpdate.put(Bytes.toBytes(cfName + j), Bytes.toBytes(value));
this.put.add(Bytes.toBytes(cfName), Bytes.toBytes(String.valueOf(j)), Bytes.toBytes(value));
}
- public void put(String name, double value) {
- this.batchUpdate.put(Bytes.toBytes(name), Bytes.toBytes(value));
- }
-
- @Deprecated
- public void put(int j, String name) {
- this.batchUpdate.put(Bytes
- .toBytes((Bytes.toString(Constants.ATTRIBUTE) + j)), Bytes
- .toBytes(name));
- }
-
- public void put(String j, String val) {
- this.batchUpdate.put(j, Bytes.toBytes(val));
- }
-
public void put(String column, String qualifier, String val) {
this.put.add(Bytes.toBytes(column), Bytes.toBytes(qualifier), Bytes
.toBytes(val));
@@ -94,14 +64,6 @@
this.put.add(Bytes.toBytes(column), Bytes.toBytes(qualifier), Bytes
.toBytes(val));
}
-
- public void put(String row, int val) {
- this.batchUpdate.put(row, BytesUtil.intToBytes(val));
- }
-
- public BatchUpdate getBatchUpdate() {
- return this.batchUpdate;
- }
public void putAll(Map<Integer, Double> buffer) {
for (Map.Entry<Integer, Double> f : buffer.entrySet()) {
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/CollectBlocksReducer.java Thu Oct 29 06:13:51 2009
@@ -68,7 +68,7 @@
int seq = (x * mBlockNum) + key.getColumn() + r;
BlockID bkID = new BlockID(key.getRow(), x, seq);
Put put = new Put(bkID.getBytes());
- put.add(Bytes.toBytes(Constants.BLOCK_FAMILY),
+ put.add(Bytes.toBytes(Constants.BLOCK),
Bytes.toBytes("a"),
subMatrix.getBytes());
context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
@@ -79,7 +79,7 @@
+ key.getRow();
BlockID bkID = new BlockID(x, key.getColumn(), seq);
Put put = new Put(bkID.getBytes());
- put.add(Bytes.toBytes(Constants.BLOCK_FAMILY),
+ put.add(Bytes.toBytes(Constants.BLOCK),
Bytes.toBytes("b"),
subMatrix.getBytes());
context.write(new ImmutableBytesWritable(bkID.getBytes()), put);
Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/DummyMapper.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,15 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class DummyMapper<K, V> extends Mapper<K, V, K, V> {
+ /** The dummy function. */
+ public void map(K key, V val, OutputCollector<K, V> output, Reporter reporter)
+ throws IOException {
+ // do nothing
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/PivotInputFormat.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,286 @@
+package org.apache.hama.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.Pair;
+import org.apache.hama.util.BytesUtil;
+
+public class PivotInputFormat extends InputFormat<Pair, DoubleWritable>
+ implements Configurable {
+ final static Log LOG = LogFactory.getLog(PivotInputFormat.class);
+
+ /** Job parameter that specifies the output table. */
+ public static final String INPUT_TABLE = "hama.mapreduce.inputtable";
+ /** Space delimited list of columns. */
+ public static final String SCAN = "hama.mapreduce.scan";
+
+ /** The configuration. */
+ private Configuration conf = null;
+
+ /** Holds the details for the internal scanner. */
+ private Scan scan = null;
+ /** The table to scan. */
+ private HTable table = null;
+ /** The reader scanning the table, can be a custom one. */
+ private PivotRecordReader pivotRecordReader = null;
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ if (table == null) {
+ throw new IOException("No table was provided.");
+ }
+ byte[][] startKeys = table.getStartKeys();
+ if (startKeys == null || startKeys.length == 0) {
+ throw new IOException("Expecting at least one region.");
+ }
+ int realNumSplits = startKeys.length;
+ InputSplit[] splits = new InputSplit[realNumSplits];
+ int middle = startKeys.length / realNumSplits;
+ int startPos = 0;
+ for (int i = 0; i < realNumSplits; i++) {
+ int lastPos = startPos + middle;
+ lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+ String regionLocation = table.getRegionLocation(startKeys[startPos])
+ .getServerAddress().getHostname();
+ splits[i] = new TableSplit(this.table.getTableName(),
+ startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
+ : HConstants.EMPTY_START_ROW, regionLocation);
+ LOG.info("split: " + i + "->" + splits[i]);
+ startPos = lastPos;
+ }
+ return Arrays.asList(splits);
+ }
+
+ protected static class PivotRecordReader extends
+ RecordReader<Pair, DoubleWritable> {
+ private int totalRows;
+ private int processedRows;
+ private int size;
+ boolean mocked = true;
+
+ private ResultScanner scanner = null;
+ private Scan scan = null;
+ private HTable htable = null;
+ private byte[] lastRow = null;
+ private Pair key = null;
+ private DoubleWritable value = null;
+
+ @Override
+ public void close() {
+ this.scanner.close();
+ }
+
+ public void setScan(Scan scan) {
+ this.scan = scan;
+ }
+
+ public void setHTable(HTable htable) {
+ this.htable = htable;
+ }
+
+ public void init() throws IOException {
+ restart(scan.getStartRow());
+ }
+
+ public void restart(byte[] firstRow) throws IOException {
+ Scan newScan = new Scan(scan);
+ newScan.setStartRow(firstRow);
+ this.scanner = this.htable.getScanner(newScan);
+ }
+
+ @Override
+ public Pair getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public DoubleWritable getCurrentValue() throws IOException,
+ InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ if (totalRows <= 0) {
+ return 0;
+ } else {
+ return Math.min(1.0f, processedRows / (float) totalRows);
+ }
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null)
+ key = new Pair();
+ if (value == null)
+ value = new DoubleWritable();
+
+ Result vv;
+ try {
+ vv = this.scanner.next();
+ } catch (IOException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ scanner.next(); // skip presumed already mapped row
+ vv = scanner.next();
+ }
+
+ boolean hasMore = vv != null && vv.size() > 0;
+ if (hasMore) {
+
+ byte[] row = vv.getRow();
+
+ int rowId = BytesUtil.bytesToInt(row);
+ if (rowId == size - 1) { // skip the last row
+ if (mocked) {
+ key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ mocked = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ byte[] col = vv.getValue(Bytes
+ .toBytes(Constants.EI), Bytes
+ .toBytes(Constants.EIIND));
+ int colId = BytesUtil.bytesToInt(col);
+ double val = 0;
+
+ Get get = new Get(BytesUtil.getRowIndex(rowId));
+ byte[] cell = htable.get(get).getValue(
+ Bytes.toBytes(Constants.EICOL),
+ Bytes.toBytes(String.valueOf(colId)));
+ if (cell != null) {
+ val = BytesUtil.bytesToDouble(cell);
+ }
+
+ key.set(rowId, colId);
+ value.set(val);
+
+ lastRow = row;
+ } else {
+ if (mocked) {
+ key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ mocked = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ return hasMore;
+ }
+ }
+
+ @Override
+ public RecordReader<Pair, DoubleWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ PivotRecordReader trr = this.pivotRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new PivotRecordReader();
+ }
+ Scan sc = new Scan(this.scan);
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ trr.setScan(sc);
+ trr.setHTable(table);
+ trr.init();
+ return trr;
+ }
+
+ protected HTable getHTable() {
+ return this.table;
+ }
+
+ protected void setHTable(HTable table) {
+ this.table = table;
+ }
+
+ public Scan getScan() {
+ if (this.scan == null)
+ this.scan = new Scan();
+ return scan;
+ }
+
+ public void setScan(Scan scan) {
+ this.scan = scan;
+ }
+
+ protected void setTableRecordReader(PivotRecordReader pivotRecordReader) {
+ this.pivotRecordReader = pivotRecordReader;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ String tableName = conf.get(INPUT_TABLE);
+ try {
+ setHTable(new HTable(new HBaseConfiguration(conf), tableName));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ Scan scan = null;
+ try {
+ scan = convertStringToScan(conf.get(SCAN));
+ } catch (IOException e) {
+ LOG.error("An error occurred.", e);
+ }
+ setScan(scan);
+ }
+
+ public static String convertScanToString(Scan scan) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(out);
+ scan.write(dos);
+ return Base64.encodeBytes(out.toByteArray());
+ }
+
+ public static Scan convertStringToScan(String base64) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
+ DataInputStream dis = new DataInputStream(bis);
+ Scan scan = new Scan();
+ scan.readFields(dis);
+ return scan;
+ }
+
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapreduce/RotationInputFormat.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,346 @@
+package org.apache.hama.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.util.BytesUtil;
+
+public class RotationInputFormat extends
+ InputFormat<NullWritable, NullWritable> implements Configurable {
+ final static Log LOG = LogFactory.getLog(RotationInputFormat.class);
+ /** Job parameter that specifies the output table. */
+ public static final String INPUT_TABLE = "hama.mapreduce.inputtable";
+ /** Space delimited list of columns. */
+ public static final String SCAN = "hama.mapreduce.scan";
+
+ /** The configuration. */
+ private Configuration conf = null;
+
+ /** Holds the details for the internal scanner. */
+ private Scan scan = null;
+ /** The table to scan. */
+ private HTable table = null;
+ /** The reader scanning the table, can be a custom one. */
+ private RotationRecordReader rotationRecordReader;
+
+ int pivot_row, pivot_col;
+ double pivot_cos, pivot_sin;
+
+ protected static class RotationRecordReader extends
+ RecordReader<NullWritable, NullWritable> {
+ private ResultScanner scanner = null;
+ private Scan scan = null;
+ private HTable htable = null;
+ private byte[] lastRow = null;
+
+ private int totalRows;
+ private int processedRows;
+ int startRowId, endRowId = -1;
+ int size;
+
+ int pivotrow, pivotcol;
+ byte[] prow, pcol;
+ double pivotcos, pivotsin;
+
+ public RotationRecordReader(int pr, int pc, double psin, double pcos) {
+ super();
+ pivotrow = pr;
+ pivotcol = pc;
+ pivotsin = psin;
+ pivotcos = pcos;
+ prow = Bytes.toBytes(pivotrow);
+ pcol = Bytes.toBytes(pivotcol);
+ LOG.info(prow);
+ LOG.info(pcol);
+ }
+
+ public void setScan(Scan scan) {
+ this.scan = scan;
+ }
+
+ public void setHTable(HTable htable) {
+ this.htable = htable;
+ }
+
+ public void init() throws IOException {
+ restart(scan.getStartRow());
+ byte[] startRow = scan.getStartRow();
+ byte[] endRow = scan.getStopRow();
+
+ Get get = new Get(Bytes.toBytes(Constants.METADATA));
+ get.addFamily(Constants.ATTRIBUTE);
+ byte[] result = htable.get(get).getValue(Constants.ATTRIBUTE,
+ Bytes.toBytes("rows"));
+
+ size = (result != null) ? BytesUtil.bytesToInt(result) : 0;
+
+ if (endRow.length == 0) { // the last split, we don't know the end row
+ totalRows = 0; // so we just skip it.
+ if (startRow.length == 0)
+ startRowId = 0;
+ else
+ startRowId = BytesUtil.bytesToInt(startRow);
+ endRowId = -1;
+ } else {
+ if (startRow.length == 0) { // the first split, start row is 0
+ totalRows = BytesUtil.bytesToInt(endRow);
+ startRowId = 0;
+ endRowId = totalRows;
+ } else {
+ startRowId = BytesUtil.bytesToInt(startRow);
+ endRowId = BytesUtil.bytesToInt(endRow);
+ totalRows = startRowId - endRowId;
+ }
+ }
+ processedRows = 0;
+ LOG.info("Split (" + startRowId + ", " + endRowId + ") -> " + totalRows);
+ }
+
+ public void restart(byte[] firstRow) throws IOException {
+ Scan newScan = new Scan(scan);
+ newScan.setStartRow(firstRow);
+ this.scanner = this.htable.getScanner(newScan);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.scanner.close();
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException,
+ InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ if (totalRows <= 0) {
+ return 0;
+ } else {
+ return Math.min(1.0f, processedRows / (float) totalRows);
+ }
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ Result vv;
+ try {
+ vv = this.scanner.next();
+ } catch (IOException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ scanner.next(); // skip presumed already mapped row
+ vv = scanner.next();
+ }
+
+ double s1, s2;
+ VectorUpdate bu;
+ boolean hasMore = vv != null && vv.size() > 0;
+ if (hasMore) {
+ byte[] row = vv.getRow();
+ int rowId = BytesUtil.bytesToInt(row);
+ if (rowId < pivotrow) {
+ Get get = new Get(BytesUtil.getRowIndex(rowId));
+ s1 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+ Bytes.toBytes(Constants.EICOL),
+ Bytes.toBytes(String.valueOf(pivotrow))));
+ s2 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+ Bytes.toBytes(Constants.EICOL),
+ Bytes.toBytes(String.valueOf(pivotcol))));
+
+ bu = new VectorUpdate(rowId);
+ bu.put(Constants.EICOL, pivotrow, pivotcos * s1
+ - pivotsin * s2);
+ bu.put(Constants.EICOL, pivotcol, pivotsin * s1
+ + pivotcos * s2);
+
+ htable.put(bu.getPut());
+ } else if (rowId == pivotrow) {
+ return true;
+ } else if (rowId < pivotcol) {
+ Get get = new Get(BytesUtil.getRowIndex(pivotrow));
+ s1 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+ Bytes.toBytes(Constants.EICOL),
+ Bytes.toBytes(String.valueOf(rowId))));
+ get = new Get(BytesUtil.getRowIndex(rowId));
+
+ s2 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+ Bytes.toBytes(Constants.EICOL),
+ Bytes.toBytes(String.valueOf(pivotcol))));
+
+ bu = new VectorUpdate(rowId);
+ bu.put(Constants.EICOL, pivotcol, pivotsin * s1
+ + pivotcos * s2);
+ htable.put(bu.getPut());
+
+ bu = new VectorUpdate(pivotrow);
+ bu.put(Constants.EICOL, rowId, pivotcos * s1 - pivotsin
+ * s2);
+ htable.put(bu.getPut());
+
+ } else if (rowId == pivotcol) {
+ for (int i = pivotcol + 1; i < size; i++) {
+ Get get = new Get(BytesUtil.getRowIndex(pivotrow));
+
+ s1 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+ Bytes.toBytes(Constants.EICOL),
+ Bytes.toBytes(String.valueOf(i))));
+
+ get = new Get(BytesUtil.getRowIndex(pivotcol));
+ s2 = BytesUtil.bytesToDouble(htable.get(get).getValue(
+ Bytes.toBytes(Constants.EICOL),
+ Bytes.toBytes(String.valueOf(i))));
+
+ bu = new VectorUpdate(pivotcol);
+ bu.put(Constants.EICOL, i, pivotsin * s1 + pivotcos
+ * s2);
+ htable.put(bu.getPut());
+
+ bu = new VectorUpdate(pivotrow);
+ bu.put(Constants.EICOL, i, pivotcos * s1 - pivotsin
+ * s2);
+ htable.put(bu.getPut());
+ }
+ } else { // rowId > pivotcol
+ return false;
+ }
+
+ lastRow = row;
+ }
+ return hasMore;
+ }
+
+ }
+
+ @Override
+ public RecordReader<NullWritable, NullWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ TableSplit tSplit = (TableSplit) split;
+ RotationRecordReader trr = this.rotationRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin, pivot_cos);
+ }
+ Scan sc = new Scan(this.scan);
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ trr.setScan(sc);
+ trr.setHTable(table);
+ trr.init();
+ return trr;
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ if (table == null) {
+ throw new IOException("No table was provided.");
+ }
+ byte[][] startKeys = table.getStartKeys();
+ if (startKeys == null || startKeys.length == 0) {
+ throw new IOException("Expecting at least one region.");
+ }
+ int realNumSplits = startKeys.length;
+ InputSplit[] splits = new InputSplit[realNumSplits];
+ int middle = startKeys.length / realNumSplits;
+ int startPos = 0;
+ for (int i = 0; i < realNumSplits; i++) {
+ int lastPos = startPos + middle;
+ lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+ String regionLocation = table.getRegionLocation(startKeys[startPos])
+ .getServerAddress().getHostname();
+ splits[i] = new TableSplit(this.table.getTableName(),
+ startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
+ : HConstants.EMPTY_START_ROW, regionLocation);
+ LOG.info("split: " + i + "->" + splits[i]);
+ startPos = lastPos;
+ }
+ return Arrays.asList(splits);
+ }
+
+ protected HTable getHTable() {
+ return this.table;
+ }
+
+ protected void setHTable(HTable table) {
+ this.table = table;
+ }
+
+ public Scan getScan() {
+ if (this.scan == null)
+ this.scan = new Scan();
+ return scan;
+ }
+
+ public void setScan(Scan scan) {
+ this.scan = scan;
+ }
+
+ protected void setTableRecordReader(RotationRecordReader rotationRecordReader) {
+ this.rotationRecordReader = rotationRecordReader;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ pivot_row = conf.getInt(Constants.PIVOTROW, -1);
+ pivot_col = conf.getInt(Constants.PIVOTCOL, -1);
+ pivot_sin = Double.parseDouble(conf.get(Constants.PIVOTSIN));
+ pivot_cos = Double.parseDouble(conf.get(Constants.PIVOTCOS));
+
+ this.conf = conf;
+ String tableName = conf.get(INPUT_TABLE);
+ try {
+ setHTable(new HTable(new HBaseConfiguration(conf), tableName));
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ Scan scan = null;
+ try {
+ scan = PivotInputFormat.convertStringToScan(conf.get(SCAN));
+ } catch (IOException e) {
+ LOG.error("An error occurred.", e);
+ }
+ setScan(scan);
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java Thu Oct 29 06:13:51 2009
@@ -54,7 +54,6 @@
import org.apache.hama.HamaAdminImpl;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.matrix.algebra.JacobiEigenValue;
import org.apache.hama.matrix.algebra.MatrixNormMapReduce;
import org.apache.hama.matrix.algebra.TransposeMap;
import org.apache.hama.matrix.algebra.TransposeReduce;
@@ -137,14 +136,14 @@
// It's a temporary data.
this.tableDesc.addFamily(new HColumnDescriptor(Bytes
- .toBytes(Constants.BLOCK_FAMILY)));
+ .toBytes(Constants.BLOCK)));
// the following families are used in JacobiEigenValue computation
this.tableDesc.addFamily(new HColumnDescriptor(Bytes
- .toBytes(JacobiEigenValue.EI_COLUMNFAMILY)));
+ .toBytes(Constants.EI)));
this.tableDesc.addFamily(new HColumnDescriptor(Bytes
- .toBytes(JacobiEigenValue.EICOL)));
+ .toBytes(Constants.EICOL)));
this.tableDesc.addFamily(new HColumnDescriptor(Bytes
- .toBytes(JacobiEigenValue.EIVEC)));
+ .toBytes(Constants.EIVEC)));
LOG.info("Initializing the matrix storage.");
this.admin.createTable(this.tableDesc);
@@ -460,10 +459,10 @@
scan.addFamily(Constants.COLUMNFAMILY);
scan.addFamily(Constants.ATTRIBUTE);
scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY));
- scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
- scan.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
- scan.addFamily(Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY));
- scan.addFamily(Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY));
+ scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+ scan.addFamily(Bytes.toBytes(Constants.EI));
+ scan.addFamily(Bytes.toBytes(Constants.EICOL));
+ scan.addFamily(Bytes.toBytes(Constants.EIVEC));
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(B
.getPath(), scan, ScanMapper.class, ImmutableBytesWritable.class,
@@ -489,10 +488,10 @@
scan.addFamily(Constants.COLUMNFAMILY);
scan.addFamily(Constants.ATTRIBUTE);
scan.addFamily(Bytes.toBytes(Constants.ALIASEFAMILY));
- scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
- scan.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
- scan.addFamily(Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY));
- scan.addFamily(Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY));
+ scan.addFamily(Bytes.toBytes(Constants.BLOCK));
+ scan.addFamily(Bytes.toBytes(Constants.EI));
+ scan.addFamily(Bytes.toBytes(Constants.EICOL));
+ scan.addFamily(Bytes.toBytes(Constants.EIVEC));
Float f = new Float(alpha);
job.getConfiguration().setFloat("set.alpha", f);
Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java Thu Oct 29 06:13:51 2009
@@ -31,10 +31,12 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
@@ -46,32 +48,31 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.DoubleEntry;
import org.apache.hama.io.Pair;
import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.mapred.DummyMapper;
-import org.apache.hama.mapred.VectorInputFormat;
import org.apache.hama.mapreduce.CollectBlocksMapper;
+import org.apache.hama.mapreduce.DummyMapper;
+import org.apache.hama.mapreduce.PivotInputFormat;
import org.apache.hama.mapreduce.RandomMatrixMapper;
import org.apache.hama.mapreduce.RandomMatrixReducer;
+import org.apache.hama.mapreduce.RotationInputFormat;
import org.apache.hama.matrix.algebra.BlockMultMap;
import org.apache.hama.matrix.algebra.BlockMultReduce;
import org.apache.hama.matrix.algebra.DenseMatrixVectorMultMap;
import org.apache.hama.matrix.algebra.DenseMatrixVectorMultReduce;
-import org.apache.hama.matrix.algebra.JacobiEigenValue;
+import org.apache.hama.matrix.algebra.JacobiInitMap;
import org.apache.hama.matrix.algebra.MatrixAdditionMap;
import org.apache.hama.matrix.algebra.MatrixAdditionReduce;
+import org.apache.hama.matrix.algebra.PivotMap;
import org.apache.hama.util.BytesUtil;
import org.apache.hama.util.RandomVariable;
@@ -600,7 +601,7 @@
String collectionTable = "collect_" + RandomVariable.randMatrixPath();
HTableDescriptor desc = new HTableDescriptor(collectionTable);
desc
- .addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK_FAMILY)));
+ .addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
this.admin.createTable(desc);
LOG.info("Collect Blocks");
@@ -613,7 +614,7 @@
Job job = new Job(config, "multiplication MR job : " + result.getPath());
Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes(Constants.BLOCK_FAMILY));
+ scan.addFamily(Bytes.toBytes(Constants.BLOCK));
TableMapReduceUtil.initTableMapperJob(collectionTable, scan,
BlockMultMap.class, BlockID.class, BytesWritable.class, job);
@@ -773,30 +774,30 @@
* @throws IOException
*/
public void jacobiEigenValue(int loops) throws IOException {
- JobConf jobConf = new JobConf(config);
-
- /***************************************************************************
- * Initialization
- *
- * A M/R job is used for initialization(such as, preparing a matrx copy of
- * the original in "eicol:" family.)
- **************************************************************************/
+ /*
+ * Initialization A M/R job is used for initialization(such as, preparing a
+ * matrx copy of the original in "eicol:" family.)
+ */
// initialization
- jobConf.setJobName("JacobiEigen initialization MR job" + getPath());
+ Job job = new Job(config, "JacobiEigen initialization MR job" + getPath());
- jobConf.setMapperClass(JacobiEigenValue.InitMapper.class);
- jobConf.setInputFormat(VectorInputFormat.class);
- jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ Scan scan = new Scan();
+ scan.addFamily(Constants.COLUMNFAMILY);
- org.apache.hadoop.mapred.FileInputFormat.addInputPaths(jobConf, getPath());
- jobConf.set(JacobiEigenValue.MATRIX, getPath());
- jobConf.setOutputFormat(NullOutputFormat.class);
- jobConf.setMapOutputKeyClass(IntWritable.class);
- jobConf.setMapOutputValueClass(MapWritable.class);
+ TableMapReduceUtil.initTableMapperJob(getPath(), scan, JacobiInitMap.class,
+ ImmutableBytesWritable.class, Put.class, job);
+ TableMapReduceUtil.initTableReducerJob(getPath(),
+ IdentityTableReducer.class, job);
- JobClient.runJob(jobConf);
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
- final FileSystem fs = FileSystem.get(jobConf);
+ final FileSystem fs = FileSystem.get(config);
Pair pivotPair = new Pair();
DoubleWritable pivotWritable = new DoubleWritable();
VectorUpdate vu;
@@ -809,45 +810,46 @@
double s, c, t, y;
while (state != 0 && loops > 0) {
- /*************************************************************************
- * Find the pivot and its index(pivot_row, pivot_col)
- *
- * A M/R job is used to scan all the "eival:ind" to get the max absolute
- * value of each row, and do a MAX aggregation of these max values to get
- * the max value in the matrix.
- ************************************************************************/
- jobConf = new JobConf(config);
- jobConf.setJobName("Find Pivot MR job" + getPath());
-
- jobConf.setNumReduceTasks(1);
-
+ /*
+ * Find the pivot and its index(pivot_row, pivot_col) A M/R job is used to
+ * scan all the "eival:ind" to get the max absolute value of each row, and
+ * do a MAX aggregation of these max values to get the max value in the
+ * matrix.
+ */
Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_"
+ System.currentTimeMillis()), "out");
if (fs.exists(outDir))
fs.delete(outDir, true);
- jobConf.setMapperClass(JacobiEigenValue.PivotMapper.class);
- jobConf.setInputFormat(JacobiEigenValue.PivotInputFormat.class);
- jobConf.set(JacobiEigenValue.PivotInputFormat.COLUMN_LIST,
- JacobiEigenValue.EIIND);
- org.apache.hadoop.mapred.FileInputFormat
- .addInputPaths(jobConf, getPath());
- jobConf.setMapOutputKeyClass(Pair.class);
- jobConf.setMapOutputValueClass(DoubleWritable.class);
-
- jobConf.setOutputKeyClass(Pair.class);
- jobConf.setOutputValueClass(DoubleWritable.class);
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
- FileOutputFormat.setOutputPath(jobConf, outDir);
+ job = new Job(config, "Find Pivot MR job" + getPath());
- // update the out put dir of the job
- outDir = FileOutputFormat.getOutputPath(jobConf);
+ scan = new Scan();
+ scan.addFamily(Bytes.toBytes(Constants.EI));
- JobClient.runJob(jobConf);
+ job.setInputFormatClass(PivotInputFormat.class);
+ job.setMapOutputKeyClass(Pair.class);
+ job.setMapOutputValueClass(DoubleWritable.class);
+ job.setMapperClass(PivotMap.class);
+ job.getConfiguration().set(PivotInputFormat.INPUT_TABLE, getPath());
+ job.getConfiguration().set(PivotInputFormat.SCAN,
+ PivotInputFormat.convertScanToString(scan));
+
+ job.setOutputKeyClass(Pair.class);
+ job.setOutputValueClass(DoubleWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, outDir);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
// read outputs
- Path inFile = new Path(outDir, "part-00000");
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ Path inFile = new Path(outDir, "part-r-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, config);
try {
reader.next(pivotPair, pivotWritable);
pivot_row = pivotPair.getRow();
@@ -864,18 +866,18 @@
* Compute the rotation parameters of next rotation.
************************************************************************/
Get get = new Get(BytesUtil.getRowIndex(pivot_row));
- get.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+ get.addFamily(Bytes.toBytes(Constants.EI));
Result r = table.get(get);
double e1 = BytesUtil.bytesToDouble(r.getValue(Bytes
- .toBytes(JacobiEigenValue.EI_COLUMNFAMILY), Bytes
- .toBytes(JacobiEigenValue.EI_VAL)));
+ .toBytes(Constants.EI), Bytes
+ .toBytes(Constants.EIVAL)));
get = new Get(BytesUtil.getRowIndex(pivot_col));
- get.addFamily(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+ get.addFamily(Bytes.toBytes(Constants.EI));
r = table.get(get);
double e2 = BytesUtil.bytesToDouble(r.getValue(Bytes
- .toBytes(JacobiEigenValue.EI_COLUMNFAMILY), Bytes
- .toBytes(JacobiEigenValue.EI_VAL)));
+ .toBytes(Constants.EI), Bytes
+ .toBytes(Constants.EIVAL)));
y = (e2 - e1) / 2;
t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y);
@@ -888,61 +890,65 @@
t = -t;
}
- /*************************************************************************
+ /*
* Upate the pivot and the eigen values indexed by the pivot
- ************************************************************************/
+ */
vu = new VectorUpdate(pivot_row);
- vu.put(JacobiEigenValue.EICOL_FAMILY, pivot_col, 0);
+ vu.put(Constants.EICOL, pivot_col, 0);
table.put(vu.getPut());
state = update(pivot_row, -t, state);
state = update(pivot_col, t, state);
- /*************************************************************************
+ /*
* Rotation the matrix
- ************************************************************************/
- // rotation
- jobConf = new JobConf(config);
- jobConf.setJobName("Rotation Matrix MR job" + getPath());
-
- jobConf.setInt(JacobiEigenValue.PIVOTROW, pivot_row);
- jobConf.setInt(JacobiEigenValue.PIVOTCOL, pivot_col);
- jobConf.set(JacobiEigenValue.PIVOTSIN, String.valueOf(s));
- jobConf.set(JacobiEigenValue.PIVOTCOS, String.valueOf(c));
-
- jobConf.setMapperClass(DummyMapper.class);
- jobConf.setInputFormat(JacobiEigenValue.RotationInputFormat.class);
- jobConf.set(JacobiEigenValue.RotationInputFormat.COLUMN_LIST,
- JacobiEigenValue.EIIND);
- org.apache.hadoop.mapred.FileInputFormat
- .addInputPaths(jobConf, getPath());
- jobConf.setMapOutputKeyClass(NullWritable.class);
- jobConf.setMapOutputValueClass(NullWritable.class);
- org.apache.hadoop.mapred.FileInputFormat
- .addInputPaths(jobConf, getPath());
- jobConf.setOutputFormat(NullOutputFormat.class);
+ */
+ job = new Job(config, "Rotation Matrix MR job" + getPath());
- JobClient.runJob(jobConf);
+ scan = new Scan();
+ scan.addFamily(Bytes.toBytes(Constants.EI));
+
+ job.getConfiguration().setInt(Constants.PIVOTROW, pivot_row);
+ job.getConfiguration().setInt(Constants.PIVOTCOL, pivot_col);
+ job.getConfiguration().set(Constants.PIVOTSIN, String.valueOf(s));
+ job.getConfiguration().set(Constants.PIVOTCOS, String.valueOf(c));
+
+ job.setInputFormatClass(RotationInputFormat.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setMapperClass(DummyMapper.class);
+ job.getConfiguration().set(RotationInputFormat.INPUT_TABLE, getPath());
+ job.getConfiguration().set(RotationInputFormat.SCAN,
+ PivotInputFormat.convertScanToString(scan));
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
// rotate eigenvectors
LOG.info("rotating eigenvector");
for (int i = 0; i < size; i++) {
get = new Get(BytesUtil.getRowIndex(pivot_row));
e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+ Bytes.toBytes(Constants.EIVEC),
Bytes.toBytes(String.valueOf(i))));
get = new Get(BytesUtil.getRowIndex(pivot_col));
e2 = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+ Bytes.toBytes(Constants.EIVEC),
Bytes.toBytes(String.valueOf(i))));
vu = new VectorUpdate(pivot_row);
- vu.put(JacobiEigenValue.EIVEC_FAMILY, i, c * e1 - s * e2);
+ vu.put(Constants.EIVEC, i, c * e1 - s * e2);
table.put(vu.getPut());
vu = new VectorUpdate(pivot_col);
- vu.put(JacobiEigenValue.EIVEC_FAMILY, i, s * e1 + c * e2);
+ vu.put(Constants.EIVEC, i, s * e1 + c * e2);
table.put(vu.getPut());
}
@@ -962,13 +968,13 @@
get = new Get(BytesUtil.getRowIndex(row));
double max = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
+ Bytes.toBytes(Constants.EICOL),
Bytes.toBytes(String.valueOf(m))));
double val;
for (int i = row + 2; i < size; i++) {
get = new Get(BytesUtil.getRowIndex(row));
val = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EICOL_FAMILY),
+ Bytes.toBytes(Constants.EICOL),
Bytes.toBytes(String.valueOf(i))));
if (Math.abs(val) > Math.abs(max)) {
m = i;
@@ -978,34 +984,34 @@
}
VectorUpdate vu = new VectorUpdate(row);
- vu.put(JacobiEigenValue.EI_COLUMNFAMILY, "ind", String.valueOf(m));
+ vu.put(Constants.EI, "ind", String.valueOf(m));
table.put(vu.getPut());
}
int update(int row, double value, int state) throws IOException {
Get get = new Get(BytesUtil.getRowIndex(row));
double e = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
- Bytes.toBytes(JacobiEigenValue.EI_VAL)));
+ Bytes.toBytes(Constants.EI),
+ Bytes.toBytes(Constants.EIVAL)));
int changed = BytesUtil.bytesToInt(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
+ Bytes.toBytes(Constants.EI),
Bytes.toBytes("changed")));
double y = e;
e += value;
VectorUpdate vu = new VectorUpdate(row);
- vu.put(JacobiEigenValue.EI_COLUMNFAMILY, JacobiEigenValue.EI_VAL, e);
+ vu.put(Constants.EI, Constants.EIVAL, e);
if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) {
changed = 0;
- vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
- JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+ vu.put(Constants.EI,
+ Constants.EICHANGED, String.valueOf(changed));
state--;
} else if (changed == 0 && (Math.abs(y - e) > .0000001)) {
changed = 1;
- vu.put(JacobiEigenValue.EI_COLUMNFAMILY,
- JacobiEigenValue.EICHANGED_STRING, String.valueOf(changed));
+ vu.put(Constants.EI,
+ Constants.EICHANGED, String.valueOf(changed));
state++;
}
@@ -1021,8 +1027,8 @@
for (int i = 0; i < e.length; i++) {
get = new Get(BytesUtil.getRowIndex(i));
e1 = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY),
- Bytes.toBytes(JacobiEigenValue.EI_VAL)));
+ Bytes.toBytes(Constants.EI),
+ Bytes.toBytes(Constants.EIVAL)));
success &= ((Math.abs(e1 - e[i]) < .0000001));
if (!success)
return success;
@@ -1030,7 +1036,7 @@
for (int j = 0; j < E[i].length; j++) {
get = new Get(BytesUtil.getRowIndex(i));
ev = BytesUtil.bytesToDouble(table.get(get).getValue(
- Bytes.toBytes(JacobiEigenValue.EIVEC_FAMILY),
+ Bytes.toBytes(Constants.EIVEC),
Bytes.toBytes(String.valueOf(j))));
success &= ((Math.abs(ev - E[i][j]) < .0000001));
if (!success)
Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultMap.java Thu Oct 29 06:13:51 2009
@@ -12,7 +12,7 @@
import org.apache.hama.matrix.SubMatrix;
public class BlockMultMap extends TableMapper<BlockID, BytesWritable> {
- private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK_FAMILY);
+ private byte[] COLUMN = Bytes.toBytes(Constants.BLOCK);
public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiInitMap.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,75 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hama.Constants;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.util.BytesUtil;
+
+/**
+ * The matrix will be modified during computing eigen value. So a new matrix
+ * will be created to prevent the original matrix being modified. To reduce the
+ * network transfer, we copy the "column" family in the original matrix to a
+ * "eicol" family. All the following modification will be done over "eicol"
+ * family.
+ *
+ * And the output Eigen Vector Arrays "eivec", and the output eigen value array
+ * "eival:value", and the temp status array "eival:changed", "eival:ind" will be
+ * created.
+ *
+ * Also "eival:state" will record the state of the rotation state of a matrix
+ */
+public class JacobiInitMap extends TableMapper<ImmutableBytesWritable, Put> {
+
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ int row, col;
+ row = BytesUtil.getRowIndex(key.get());
+ VectorUpdate vu = new VectorUpdate(row);
+
+ double val;
+ double maxVal = Double.MIN_VALUE;
+ int maxInd = row + 1;
+
+ boolean init = true;
+
+ NavigableMap<byte[], byte[]> map = value
+ .getFamilyMap(Constants.COLUMNFAMILY);
+ for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
+ val = BytesUtil.bytesToDouble(e.getValue());
+ col = BytesUtil.bytesToInt(e.getKey());
+ // copy the original matrix to "EICOL" family
+ vu.put(Constants.EICOL, col, val);
+ // make the "EIVEC" a dialog matrix
+ vu.put(Constants.EIVEC, col, col == row ? 1 : 0);
+ if (col == row) {
+ vu.put(Constants.EI, Constants.EIVAL, val);
+ }
+ // find the max index
+ if (col > row) {
+ if (init) {
+ maxInd = col;
+ maxVal = val;
+ init = false;
+ } else {
+ if (Math.abs(val) > Math.abs(maxVal)) {
+ maxVal = val;
+ maxInd = col;
+ }
+ }
+ }
+ }
+
+ // index array
+ vu.put(Constants.EI, Constants.EIIND, String.valueOf(maxInd));
+ // Changed Array set to be true during initialization
+ vu.put(Constants.EI, Constants.EICHANGED, String.valueOf(1));
+ context.write(key, vu.getPut());
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapReduce.java Thu Oct 29 06:13:51 2009
@@ -14,6 +14,7 @@
import org.apache.hama.util.BytesUtil;
import org.apache.log4j.Logger;
+/** A Catalog class collect all the mr classes to compute the matrix's norm */
public class MatrixNormMapReduce {
public final static IntWritable nKey = new IntWritable(-1);
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java?rev=830849&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/PivotMap.java Thu Oct 29 06:13:51 2009
@@ -0,0 +1,29 @@
+package org.apache.hama.matrix.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hama.io.Pair;
+
+public class PivotMap extends
+ Mapper<Pair, DoubleWritable, Pair, DoubleWritable> {
+ private double max = 0;
+ private Pair pair = new Pair(0, 0);
+ private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ private DoubleWritable dummyVal = new DoubleWritable(0.0);
+
+ public void map(Pair key, DoubleWritable value, Context context)
+ throws IOException, InterruptedException {
+ if (key.getRow() != Integer.MAX_VALUE) {
+ if (Math.abs(value.get()) > Math.abs(max)) {
+ pair.set(key.getRow(), key.getColumn());
+ max = value.get();
+ }
+ } else {
+ context.write(pair, new DoubleWritable(max));
+ context.write(dummyPair, dummyVal);
+ }
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/BytesUtil.java Thu Oct 29 06:13:51 2009
@@ -130,16 +130,6 @@
.substring(cKey.indexOf(":") + 1, cKey.length()));
}
- /**
- * Gets the column index
- *
- * @param integer
- * @return the converted value
- */
- public static byte[] getColumnIndex(int integer) {
- return Bytes.toBytes(Constants.COLUMN + String.valueOf(integer));
- }
-
public static int getBlockIndex(byte[] key) {
String cKey = new String(key);
return Integer.parseInt(cKey
Modified: incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java Thu Oct 29 06:13:51 2009
@@ -26,9 +26,9 @@
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.matrix.algebra.JacobiEigenValue;
import org.apache.hama.util.BytesUtil;
import org.apache.log4j.Logger;
@@ -80,12 +80,12 @@
for (int x = 0; x < 2; x++) {
Get get = new Get(BytesUtil.getRowIndex(x));
- get.addColumn(Bytes.toBytes(JacobiEigenValue.EI_COLUMNFAMILY));
+ get.addColumn(Bytes.toBytes(Constants.EI));
Result r = table.get(get);
double eigenvalue = BytesUtil.bytesToDouble(r.getValue(Bytes
- .toBytes(JacobiEigenValue.EI_COLUMNFAMILY), Bytes
- .toBytes(JacobiEigenValue.EI_VAL)));
+ .toBytes(Constants.EI), Bytes
+ .toBytes(Constants.EIVAL)));
assertTrue(Math.abs(eigenvalues[x] - eigenvalue) < .0000001);
assertTrue(Math.abs(Math.pow(eigenvalue, 0.5) - singularvalues[x]) < .0000001);
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java Thu Oct 29 06:13:51 2009
@@ -27,6 +27,8 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.util.BytesUtil;
@@ -67,7 +69,7 @@
HTable table = m1.getHTable();
Get get = new Get(BytesUtil.getRowIndex(0));
- get.addColumn(BytesUtil.getColumnIndex(1));
+ get.addColumn(Constants.COLUMNFAMILY, Bytes.toBytes(1));
Result r = table.get(get);
assertTrue(r.getCellValue() == null);
Modified: incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java?rev=830849&r1=830848&r2=830849&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/util/TestNumeric.java Thu Oct 29 06:13:51 2009
@@ -42,13 +42,4 @@
assertEquals(BytesUtil.bytesToDouble(BytesUtil.doubleToBytes(TEST_DOUBLE)),
TEST_DOUBLE);
}
-
- /**
- * Get the column index from hbase.
- */
- public void testGetColumnIndex() {
- byte[] result = BytesUtil.getColumnIndex(3);
- assertEquals(Bytes.toString(result), Constants.COLUMN
- + BytesUtil.getColumnIndex(result));
- }
}