You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/09/10 07:32:59 UTC
svn commit: r813233 [4/7] - in /incubator/hama/branches: ./ hama-0.19/
hama-0.19/bin/ hama-0.19/conf/ hama-0.19/lib/ hama-0.19/lib/findbugs/
hama-0.19/lib/findbugs/plugin/ hama-0.19/lib/jetty-ext/ hama-0.19/src/
hama-0.19/src/docs/ hama-0.19/src/docs/s...
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseMatrix.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,294 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hama.algebra.SparseMatrixVectorMultMap;
+import org.apache.hama.algebra.SparseMatrixVectorMultReduce;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.RandomMatrixMap;
+import org.apache.hama.mapred.RandomMatrixReduce;
+import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
+import org.apache.hama.util.RandomVariable;
+
+public class SparseMatrix extends AbstractMatrix implements Matrix {
+ static private final String TABLE_PREFIX = SparseMatrix.class.getSimpleName();
+ static private final Path TMP_DIR = new Path(SparseMatrix.class
+ .getSimpleName()
+ + "_TMP_dir");
+
+ public SparseMatrix(HamaConfiguration conf, int m, int n) throws IOException {
+ setConfiguration(conf);
+
+ tryToCreateTable(TABLE_PREFIX);
+ closed = false;
+ this.setDimension(m, n);
+ }
+
+ /**
+ * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
+ * It is an internal used for map/reduce.
+ *
+ * @param conf configuration object
+ * @param matrixpath
+ * @throws IOException
+ * @throws IOException
+ */
+ public SparseMatrix(HamaConfiguration conf, String matrixpath)
+ throws IOException {
+ setConfiguration(conf);
+ matrixPath = matrixpath;
+ // load the matrix
+ table = new HTable(conf, matrixPath);
+ // TODO: now we don't increment the reference of the table
+ // for it's an internal use for map/reduce.
+ // if we want to increment the reference of the table,
+ // we don't know where to call Matrix.close in Add & Mul map/reduce
+ // process to decrement the reference. It seems difficulty.
+ }
+
+ /**
+ * Generate matrix with random elements
+ *
+ * @param conf configuration object
+ * @param m the number of rows.
+ * @param n the number of columns.
+ * @return an m-by-n matrix with uniformly distributed random elements.
+ * @throws IOException
+ */
+ public static SparseMatrix random(HamaConfiguration conf, int m, int n)
+ throws IOException {
+ SparseMatrix rand = new SparseMatrix(conf, m, n);
+ SparseVector vector = new SparseVector();
+ LOG.info("Create the " + m + " * " + n + " random matrix : "
+ + rand.getPath());
+
+ for (int i = 0; i < m; i++) {
+ vector.clear();
+ for (int j = 0; j < n; j++) {
+ Random r = new Random();
+ if(r.nextInt(2) != 0)
+ vector.set(j, RandomVariable.rand());
+ }
+ rand.setRow(i, vector);
+ }
+
+ return rand;
+ }
+
+ public static SparseMatrix random_mapred(HamaConfiguration conf, int m, int n, double percent) throws IOException {
+ SparseMatrix rand = new SparseMatrix(conf, m, n);
+ LOG.info("Create the " + m + " * " + n + " random matrix : "
+ + rand.getPath());
+
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setJobName("random matrix MR job : " + rand.getPath());
+
+ jobConf.setNumMapTasks(conf.getNumMapTasks());
+ jobConf.setNumReduceTasks(conf.getNumReduceTasks());
+
+ final Path inDir = new Path(TMP_DIR, "in");
+ FileInputFormat.setInputPaths(jobConf, inDir);
+ jobConf.setMapperClass(RandomMatrixMap.class);
+ jobConf.setMapOutputKeyClass(IntWritable.class);
+ jobConf.setMapOutputValueClass(MapWritable.class);
+
+ RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class,
+ jobConf);
+ jobConf.setSpeculativeExecution(false);
+ jobConf.setInt("matrix.column", n);
+ jobConf.set("matrix.type", TABLE_PREFIX);
+ jobConf.set("matrix.density", String.valueOf(percent));
+
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ final FileSystem fs = FileSystem.get(jobConf);
+ int interval = m / conf.getNumMapTasks();
+
+ // generate an input file for each map task
+ for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+ final Path file = new Path(inDir, "part" + i);
+ final IntWritable start = new IntWritable(i * interval);
+ IntWritable end = null;
+ if ((i + 1) != conf.getNumMapTasks()) {
+ end = new IntWritable(((i * interval) + interval) - 1);
+ } else {
+ end = new IntWritable(m - 1);
+ }
+ final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf,
+ file, IntWritable.class, IntWritable.class, CompressionType.NONE);
+ try {
+ writer.append(start, end);
+ } finally {
+ writer.close();
+ }
+ System.out.println("Wrote input for Map #" + i);
+ }
+
+ JobClient.runJob(jobConf);
+ fs.delete(TMP_DIR, true);
+ return rand;
+ }
+
+ @Override
+ public Matrix add(Matrix B) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Matrix add(double alpha, Matrix B) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public double get(int i, int j) throws IOException {
+ if(this.getRows() < i || this.getColumns() < j)
+ throw new ArrayIndexOutOfBoundsException(i +", "+ j);
+
+ Cell c = table.get(BytesUtil.getRowIndex(i), BytesUtil.getColumnIndex(j));
+ return (c != null) ? BytesUtil.bytesToDouble(c.getValue()) : 0.0;
+ }
+
+ @Override
+ public Vector getColumn(int j) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * Gets the vector of row
+ *
+ * @param i the row index of the matrix
+ * @return the vector of row
+ * @throws IOException
+ */
+ public SparseVector getRow(int i) throws IOException {
+ return new SparseVector(table.getRow(BytesUtil.getRowIndex(i), new byte[][] { Bytes.toBytes(Constants.COLUMN) }));
+ }
+
+ /** {@inheritDoc} */
+ public void set(int i, int j, double value) throws IOException {
+ if(value != 0) {
+ VectorUpdate update = new VectorUpdate(i);
+ update.put(j, value);
+ table.commit(update.getBatchUpdate());
+ }
+ }
+
+ /**
+ * Returns type of matrix
+ */
+ public String getType() {
+ return this.getClass().getSimpleName();
+ }
+
+ /**
+ * C = A*B using iterative method
+ *
+ * @param B
+ * @return C
+ * @throws IOException
+ */
+ public SparseMatrix mult(Matrix B) throws IOException {
+ SparseMatrix result = new SparseMatrix(config, this.getRows(), this.getColumns());
+
+ for(int i = 0; i < this.getRows(); i++) {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("multiplication MR job : " + result.getPath() + " " + i);
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ SparseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(), SparseMatrixVectorMultMap.class,
+ IntWritable.class, MapWritable.class, jobConf);
+ SparseMatrixVectorMultReduce.initJob(result.getPath(), SparseMatrixVectorMultReduce.class,
+ jobConf);
+ JobManager.execute(jobConf);
+ }
+
+ return result;
+ }
+
+ @Override
+ public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * Computes the given norm of the matrix
+ *
+ * @param type
+ * @return norm of the matrix
+ * @throws IOException
+ */
+ public double norm(Norm type) throws IOException {
+ if (type == Norm.One)
+ return getNorm1();
+ else if (type == Norm.Frobenius)
+ return getFrobenius();
+ else if (type == Norm.Infinity)
+ return getInfinity();
+ else
+ return getMaxvalue();
+ }
+
+ @Override
+ public void setColumn(int column, Vector vector) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setRow(int row, Vector vector) throws IOException {
+ if(this.getRows() < row)
+ throw new ArrayIndexOutOfBoundsException(row);
+
+ if(vector.size() > 0) { // stores if size > 0
+ VectorUpdate update = new VectorUpdate(row);
+ update.putAll(((SparseVector) vector).getEntries());
+ table.commit(update.getBatchUpdate());
+ }
+ }
+
+ @Override
+ public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
\ No newline at end of file
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SparseVector.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,185 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.log4j.Logger;
+
+/**
+ * This class represents a sparse vector.
+ */
+public class SparseVector extends AbstractVector implements Vector {
+ static final Logger LOG = Logger.getLogger(SparseVector.class);
+
+ public SparseVector() {
+ this(new MapWritable());
+ }
+
+ public SparseVector(MapWritable m) {
+ this.entries = m;
+ }
+
+ public SparseVector(RowResult row) {
+ this.initMap(row);
+ }
+
+ @Override
+ public Vector add(double alpha, Vector v) {
+ if (alpha == 0)
+ return this;
+
+ for (Map.Entry<Writable, Writable> e : v.getEntries().entrySet()) {
+ if (this.entries.containsKey(e.getKey())) {
+ // add
+ double value = alpha * ((DoubleEntry) e.getValue()).getValue()
+ + this.get(((IntWritable) e.getKey()).get());
+ this.entries.put(e.getKey(), new DoubleEntry(value));
+ } else {
+ // put
+ double value = alpha * ((DoubleEntry) e.getValue()).getValue();
+ this.entries.put(e.getKey(), new DoubleEntry(value));
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * x = v + x
+ *
+ * @param v2
+ * @return x = v + x
+ */
+ public SparseVector add(Vector v2) {
+
+ for (Map.Entry<Writable, Writable> e : v2.getEntries().entrySet()) {
+ int key = ((IntWritable) e.getKey()).get();
+ if (this.entries.containsKey(e.getKey())) {
+ this.add(key, ((DoubleEntry) e.getValue()).getValue());
+ } else {
+ this.set(key, ((DoubleEntry) e.getValue()).getValue());
+ }
+ }
+
+ return this;
+ }
+
+ @Override
+ public double dot(Vector v) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public double norm(Norm type) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /**
+ * v = alpha*v
+ *
+ * @param alpha
+ * @return v = alpha*v
+ */
+ public SparseVector scale(double alpha) {
+ for (Map.Entry<Writable, Writable> e : this.entries.entrySet()) {
+ this.entries.put(e.getKey(), new DoubleEntry(((DoubleEntry) e.getValue())
+ .getValue()
+ * alpha));
+ }
+ return this;
+ }
+
+ /**
+ * Gets the value of index
+ *
+ * @param index
+ * @return the value of v(index)
+ * @throws IOException
+ */
+ public double get(int index) {
+ double value;
+ try {
+ value = ((DoubleEntry) this.entries.get(new IntWritable(index)))
+ .getValue();
+ } catch (NullPointerException e) { // returns zero if there is no value
+ return 0;
+ }
+
+ return value;
+ }
+
+ /**
+ * Sets the value of index
+ *
+ * @param index
+ * @param value
+ */
+ public void set(int index, double value) {
+ // If entries are null, create new object
+ if (this.entries == null) {
+ this.entries = new MapWritable();
+ }
+
+ if (value != 0) // only stores non-zero element
+ this.entries.put(new IntWritable(index), new DoubleEntry(value));
+ }
+
+ /**
+ * Adds the value to v(index)
+ *
+ * @param index
+ * @param value
+ */
+ public void add(int index, double value) {
+ set(index, get(index) + value);
+ }
+
+ /**
+ * Sets the vector
+ *
+ * @param v
+ * @return x = v
+ */
+ public SparseVector set(Vector v) {
+ return new SparseVector(v.getEntries());
+ }
+
+ @Override
+ public Vector subVector(int i0, int i1) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Vector set(double alpha, Vector v) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/SubMatrix.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,222 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * A sub matrix is a matrix formed by selecting certain rows and columns from a
+ * bigger matrix. This is a in-memory operation only.
+ */
+public class SubMatrix {
+ static final Logger LOG = Logger.getLogger(SubMatrix.class);
+ private double[][] matrix;
+
+ /**
+ * Constructor
+ *
+ * @param i the size of rows
+ * @param j the size of columns
+ */
+ public SubMatrix(int i, int j) {
+ this.matrix = new double[i][j];
+ }
+
+ /**
+ * Constructor
+ *
+ * @param c a two dimensional double array
+ */
+ public SubMatrix(double[][] c) {
+ double[][] matrix = c;
+ this.matrix = matrix;
+ }
+
+ public SubMatrix(byte[] matrix) throws IOException {
+ ByteArrayInputStream bos = new ByteArrayInputStream(matrix);
+ DataInputStream dis = new DataInputStream(bos);
+
+ int rows = dis.readInt();
+ int columns = dis.readInt();
+ this.matrix = new double[rows][columns];
+
+ for(int i = 0; i < rows; i++) {
+ for(int j = 0; j < columns; j++) {
+ this.matrix[i][j] = dis.readDouble();
+ }
+ }
+
+ dis.close();
+ bos.close();
+ }
+
+ /**
+ * Sets the value
+ *
+ * @param row
+ * @param column
+ * @param value
+ */
+ public void set(int row, int column, double value) {
+ matrix[row][column] = value;
+ }
+
+ /**
+ * Sets the value
+ *
+ * @param row
+ * @param column
+ * @param value
+ */
+ public void set(int row, int column, byte[] value) {
+ matrix[row][column] = BytesUtil.bytesToDouble(value);
+ }
+
+ /**
+ * Gets the value
+ *
+ * @param i
+ * @param j
+ * @return the value of submatrix(i, j)
+ */
+ public double get(int i, int j) {
+ return matrix[i][j];
+ }
+
+ public void add(int row, int column, double value) {
+ matrix[row][column] = matrix[row][column] + value;
+ }
+
+ /**
+ * c = a+b
+ *
+ * @param b
+ * @return c
+ */
+ public SubMatrix add(SubMatrix b) {
+ SubMatrix c = new SubMatrix(this.getRows(), this.getColumns());
+
+ for (int i = 0; i < this.getRows(); i++) {
+ for (int j = 0; j < this.getColumns(); j++) {
+ c.set(i, j, (this.get(i, j) + b.get(i, j)));
+ }
+ }
+
+ return c;
+ }
+
+ /**
+ * c = a*b
+ *
+ * @param b
+ * @return c
+ */
+ public SubMatrix mult(SubMatrix b) {
+ SubMatrix c = new SubMatrix(this.getRows(), b.getColumns());
+
+ for (int i = 0; i < this.getRows(); i++) {
+ for (int j = 0; j < b.getColumns(); j++) {
+ for (int k = 0; k < this.getColumns(); k++) {
+ c.add(i, j, this.get(i, k) * b.get(k, j));
+ }
+ }
+ }
+
+ return c;
+ }
+
+ /**
+ * Gets the number of rows
+ *
+ * @return the number of rows
+ */
+ public int getRows() {
+ return this.matrix.length;
+ }
+
+ /**
+ * Gets the number of columns
+ *
+ * @return the number of columns
+ */
+ public int getColumns() {
+ return this.matrix[0].length;
+ }
+
+ /**
+ * Close
+ */
+ public void close() {
+ matrix = null;
+ }
+
+ /**
+ * @return the 2d double array
+ */
+ public double[][] getDoubleArray() {
+ double[][] result = matrix;
+ return result;
+ }
+
+ /**
+ * Gets the bytes of the sub matrix
+ *
+ * @return the bytes of the sub matrix
+ * @throws IOException
+ */
+ public byte[] getBytes() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ dos.writeInt(this.getRows());
+ dos.writeInt(this.getColumns());
+
+ for(int i = 0; i < this.getRows(); i++) {
+ for(int j = 0; j < this.getColumns(); j++) {
+ dos.writeDouble(this.get(i, j));
+ }
+ }
+
+ byte[] data = bos.toByteArray();
+ dos.close();
+ bos.close();
+ return data;
+ }
+
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ for (int i = 0; i < this.getRows(); i++) {
+ for (int j = 0; j < this.getColumns(); j++) {
+ result.append(this.get(i, j));
+ result.append('\t');
+ }
+ result.append('\n');
+ }
+ return result.toString();
+ }
+}
+
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/Vector.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,161 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Basic vector interface.
+ */
+public interface Vector {
+
+ /**
+ * Size of the vector
+ *
+ * @return size of the vector
+ */
+ public int size();
+
+ /**
+ * Gets the value of index
+ *
+ * @param index
+ * @return v(index)
+ */
+ public double get(int index);
+
+ /**
+ * Sets the value of index
+ *
+ * @param index
+ * @param value
+ */
+ public void set(int index, double value);
+
+ /**
+ * Sets the vector
+ *
+ * @param v
+ * @return x = v
+ */
+ public Vector set(Vector v);
+
+ /**
+ * x = alpha * v
+ *
+ * @param alpha
+ * @param v
+ * @return x = alpha * v
+ */
+ public Vector set(double alpha, Vector v);
+
+ /**
+ * Adds the value to v(index)
+ *
+ * @param index
+ * @param value
+ */
+ public void add(int index, double value);
+
+ /**
+ * x = alpha*v + x
+ *
+ * @param alpha
+ * @param v
+ * @return x = alpha*v + x
+ */
+ public Vector add(double alpha, Vector v);
+
+ /**
+ * x = v + x
+ *
+ * @param v
+ * @return x = v + x
+ */
+ public Vector add(Vector v);
+
+ /**
+ * x dot v
+ *
+ * @param v
+ * @return x dot v
+ */
+ public double dot(Vector v);
+
+ /**
+ * v = alpha*v
+ *
+ * @param alpha
+ * @return v = alpha*v
+ */
+ public Vector scale(double alpha);
+
+ /**
+ * Returns a sub-vector.
+ *
+ * @param i0 the index of the first element
+ * @param i1 the index of the last element
+ * @return v[i0:i1]
+ */
+ public Vector subVector( int i0, int i1 );
+
+ /**
+ * Computes the given norm of the vector
+ *
+ * @param type
+ * @return norm of the vector
+ */
+ public double norm(Norm type);
+
+ /**
+ * Supported vector-norms.
+ */
+ enum Norm {
+
+ /** Sum of the absolute values of the entries */
+ One,
+
+ /** The root of sum of squares */
+ Two,
+
+ /** The robust norm of the vector */
+ TwoRobust,
+
+ /** Largest entry in absolute value */
+ Infinity
+ }
+
+ /**
+ * Returns an iterator
+ *
+ * @return iterator
+ */
+ public Iterator<Writable> iterator();
+
+ /**
+ * Returns the {@link org.apache.hadoop.io.MapWritable}
+ *
+ * @return the entries of vector
+ */
+ public MapWritable getEntries();
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyMap.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.mapred.BlockInputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyMap extends MapReduceBase implements
+ Mapper<BlockID, BlockWritable, BlockID, BlockWritable> {
+ static final Logger LOG = Logger.getLogger(BlockMultiplyMap.class);
+
+ public static void initJob(String matrix_a,
+ Class<BlockMultiplyMap> map, Class<BlockID> outputKeyClass,
+ Class<BlockWritable> outputValueClass, JobConf jobConf) {
+
+ jobConf.setMapOutputValueClass(outputValueClass);
+ jobConf.setMapOutputKeyClass(outputKeyClass);
+ jobConf.setMapperClass(map);
+
+ jobConf.setInputFormat(BlockInputFormat.class);
+ FileInputFormat.addInputPaths(jobConf, matrix_a);
+
+ jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+ }
+
+ @Override
+ public void map(BlockID key, BlockWritable value,
+ OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
+ throws IOException {
+ SubMatrix c = value.get(0).mult(value.get(1));
+ output.collect(key, new BlockWritable(c));
+ }
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/BlockMultiplyReduce.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockMultiplyReduce extends MapReduceBase implements
+ Reducer<BlockID, BlockWritable, IntWritable, VectorUpdate> {
+ static final Logger LOG = Logger.getLogger(BlockMultiplyReduce.class);
+
+ /**
+ * Use this before submitting a BlockCyclicMultiplyReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table,
+ Class<BlockMultiplyReduce> reducer, JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ @Override
+ public void reduce(BlockID key, Iterator<BlockWritable> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException {
+
+ SubMatrix s = null;
+ while (values.hasNext()) {
+ SubMatrix b = values.next().getMatrices().next();
+ if (s == null) {
+ s = b;
+ } else {
+ s = s.add(b);
+ }
+ }
+
+ int startRow = key.getRow() * s.getRows();
+ int startColumn = key.getColumn() * s.getColumns();
+
+ for (int i = 0; i < s.getRows(); i++) {
+ VectorUpdate update = new VectorUpdate(i + startRow);
+ for (int j = 0; j < s.getColumns(); j++) {
+ update.put(j + startColumn, s.get(i, j));
+ }
+ output.collect(new IntWritable(key.getRow()), update);
+ }
+ }
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultMap.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.log4j.Logger;
+
+public class DenseMatrixVectorMultMap extends MapReduceBase implements
+ Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
+ static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultMap.class);
+ protected DenseVector currVector;
+ public static final String ITH_ROW = "ith.row";
+ public static final String MATRIX_A = "hama.multiplication.matrix.a";
+ public static final String MATRIX_B = "hama.multiplication.matrix.b";
+ private IntWritable nKey = new IntWritable();
+
+ public void configure(JobConf job) {
+ DenseMatrix matrix_a;
+ try {
+ matrix_a = new DenseMatrix(new HamaConfiguration(job), job.get(MATRIX_A, ""));
+ int ithRow = job.getInt(ITH_ROW, 0);
+ nKey.set(ithRow);
+ currVector = matrix_a.getRow(ithRow);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void initJob(int i, String matrix_a, String matrix_b,
+ Class<DenseMatrixVectorMultMap> map, Class<IntWritable> outputKeyClass,
+ Class<MapWritable> outputValueClass, JobConf jobConf) {
+
+ jobConf.setMapOutputValueClass(outputValueClass);
+ jobConf.setMapOutputKeyClass(outputKeyClass);
+ jobConf.setMapperClass(map);
+ jobConf.setInt(ITH_ROW, i);
+ jobConf.set(MATRIX_A, matrix_a);
+ jobConf.set(MATRIX_B, matrix_b);
+
+ jobConf.setInputFormat(VectorInputFormat.class);
+ FileInputFormat.addInputPaths(jobConf, matrix_b);
+ jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ }
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
+ throws IOException {
+
+ DenseVector scaled = new DenseVector(value).scale(currVector.get(key.get()));
+ output.collect(nKey, scaled.getEntries());
+
+ }
+}
\ No newline at end of file
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/DenseMatrixVectorMultReduce.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseVector;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class DenseMatrixVectorMultReduce extends MapReduceBase implements
+ Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
+ static final Logger LOG = Logger.getLogger(DenseMatrixVectorMultReduce.class);
+
+ /**
+ * Use this before submitting a TableReduce job. It will appropriately set up
+ * the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table,
+ Class<DenseMatrixVectorMultReduce> reducer, JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<MapWritable> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException {
+ DenseVector sum = new DenseVector();
+
+ while (values.hasNext()) {
+ DenseVector nVector = new DenseVector(values.next());
+ if(sum.size() == 0) {
+ sum.zeroFill(nVector.size());
+ sum.add(nVector);
+ } else {
+ sum.add(nVector);
+ }
+ }
+
+ VectorUpdate update = new VectorUpdate(key.get());
+ update.putAll(sum.getEntries());
+
+ output.collect(key, update);
+ }
+
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/JacobiEigenValue.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,583 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.HTableInputFormatBase;
+import org.apache.hama.mapred.HTableRecordReaderBase;
+import org.apache.hama.util.BytesUtil;
+
+/**
+ * A catalog class collect all the m/r classes to compute the matrix's eigen
+ * values
+ */
+public class JacobiEigenValue {
+
+ /** a matrix copy of the original copy collected in "eicol" family * */
+ public static final String EICOL = "eicol:";
+ /** a column family collect all values and statuses used during computation * */
+ public static final String EI = "eival:";
+ /** a column collect all the eigen values * */
+ public static final String EIVAL = EI + "value";
+ /** a column identify whether the eigen values have been changed * */
+ public static final String EICHANGED = EI + "changed";
+ /** a column identify the index of the max absolute value each row * */
+ public static final String EIIND = EI + "ind";
+ /** a matrix collect all the eigen vectors * */
+ public static final String EIVEC = "eivec:";
+ public static final String MATRIX = "hama.jacobieigenvalue.matrix";
+ /** parameters for pivot * */
+ public static final String PIVOTROW = "hama.jacobi.pivot.row";
+ public static final String PIVOTCOL = "hama.jacobi.pivot.col";
+ public static final String PIVOTSIN = "hama.jacobi.pivot.sin";
+ public static final String PIVOTCOS = "hama.jacobi.pivot.cos";
+
+ static final Log LOG = LogFactory.getLog(JacobiEigenValue.class);
+
+ /**
+ * The matrix will be modified during computing eigen value. So a new matrix
+ * will be created to prevent the original matrix being modified. To reduce
+ * the network transfer, we copy the "column" family in the original matrix to
+ * a "eicol" family. All the following modification will be done over "eicol"
+ * family.
+ *
+ * And the output Eigen Vector Arrays "eivec", and the output eigen value
+ * array "eival:value", and the temp status array "eival:changed", "eival:ind"
+ * will be created.
+ *
+ * Also "eival:state" will record the state of the rotation state of a matrix
+ */
+ public static class InitMapper extends MapReduceBase implements
+ Mapper<IntWritable, MapWritable, NullWritable, NullWritable> {
+
+ HTable table;
+
+ @Override
+ public void configure(JobConf job) {
+ String tableName = job.get(MATRIX, "");
+ try {
+ table = new HTable(new HBaseConfiguration(job), tableName);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<NullWritable, NullWritable> collector, Reporter reporter)
+ throws IOException {
+ int row, col;
+ row = key.get();
+ VectorUpdate vu = new VectorUpdate(row);
+
+ double val;
+ double maxVal = Double.MIN_VALUE;
+ int maxInd = row + 1;
+
+ boolean init = true;
+ for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+ val = ((DoubleEntry) e.getValue()).getValue();
+ col = ((IntWritable) e.getKey()).get();
+ // copy the original matrix to "EICOL" family
+ vu.put(JacobiEigenValue.EICOL, col, val);
+ // make the "EIVEC" a dialog matrix
+ vu.put(JacobiEigenValue.EIVEC, col, col == row ? 1 : 0);
+ if (col == row) {
+ vu.put(JacobiEigenValue.EIVAL, val);
+ }
+ // find the max index
+ if (col > row) {
+ if (init) {
+ maxInd = col;
+ maxVal = val;
+ init = false;
+ } else {
+ if (Math.abs(val) > Math.abs(maxVal)) {
+ maxVal = val;
+ maxInd = col;
+ }
+ }
+ }
+ }
+ // index array
+ vu.put(JacobiEigenValue.EIIND, maxInd);
+ // Changed Array set to be true during initialization
+ vu.put(JacobiEigenValue.EICHANGED, 1);
+
+ table.commit(vu.getBatchUpdate());
+ }
+
+ }
+
+ /**
+ * PivotInputFormat & PivotMapper & PivotReducer are used to find the pivot in
+ * a matrix
+ */
+ public static class PivotInputFormat extends HTableInputFormatBase implements
+ InputFormat<Pair, DoubleWritable>, JobConfigurable {
+
+ private PivotRecordReader tableRecordReader;
+
+ protected static class PivotRecordReader extends HTableRecordReaderBase
+ implements RecordReader<Pair, DoubleWritable> {
+
+ private int totalRows;
+ private int processedRows;
+ private int size;
+ boolean mocked = true;
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ Cell rows = null;
+ rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+ size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+ if (endRow.length == 0) { // the last split, we don't know the end row
+ totalRows = 0; // so we just skip it.
+ } else {
+ if (startRow.length == 0) { // the first split, start row is 0
+ totalRows = BytesUtil.bytesToInt(endRow);
+ } else {
+ totalRows = BytesUtil.bytesToInt(endRow)
+ - BytesUtil.bytesToInt(startRow);
+ }
+ }
+ processedRows = 0;
+ LOG.info("Split (" + Bytes.toString(startRow) + ", "
+ + Bytes.toString(endRow) + ") -> " + totalRows);
+ }
+
+ /**
+ * @return Pair
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public Pair createKey() {
+ return new Pair();
+ }
+
+ /**
+ * @return DoubleWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public DoubleWritable createValue() {
+ return new DoubleWritable();
+ }
+
+ /**
+ * @param key Pair as input key.
+ * @param value DoubleWritable as input value
+ *
+ * Converts Scanner.next() to Pair, DoubleWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(Pair key, DoubleWritable value) throws IOException {
+ RowResult result;
+ try {
+ result = this.scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ }
+
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ byte[] row = result.getRow();
+ int rowId = BytesUtil.bytesToInt(row);
+ if (rowId == size - 1) { // skip the last row
+ if (mocked) {
+ key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ mocked = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ byte[] col = result.get(EIIND).getValue();
+ int colId = BytesUtil.bytesToInt(col);
+ double val = 0;
+
+ // get (rowId, colId)'s value
+ Cell cell = htable.get(BytesUtil.getRowIndex(rowId), Bytes
+ .toBytes(EICOL + colId));
+ if (cell != null && cell.getValue() != null) {
+ val = BytesUtil.bytesToDouble(cell.getValue());
+ }
+
+ key.set(rowId, colId);
+ value.set(val);
+
+ lastRow = row;
+ processedRows++;
+ } else {
+ if (mocked) {
+ key.set(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ mocked = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ return hasMore;
+ }
+
+ @Override
+ public float getProgress() {
+ if (totalRows <= 0) {
+ return 0;
+ } else {
+ return Math.min(1.0f, processedRows / (float) totalRows);
+ }
+ }
+
+ }
+
+ @Override
+ public RecordReader<Pair, DoubleWritable> getRecordReader(InputSplit split,
+ JobConf conf, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ PivotRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new PivotRecordReader();
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ protected void setTableRecordReader(PivotRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ }
+
+ // find the pivot of the matrix
+ public static class PivotMapper extends MapReduceBase implements
+ Mapper<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+ private double max = 0;
+ private Pair pair = new Pair(0, 0);
+ private Pair dummyPair = new Pair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ private DoubleWritable dummyVal = new DoubleWritable(0.0);
+
+ @Override
+ public void map(Pair key, DoubleWritable value,
+ OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+ throws IOException {
+ if (key.getRow() != Integer.MAX_VALUE) {
+ if (Math.abs(value.get()) > Math.abs(max)) {
+ pair.set(key.getRow(), key.getColumn());
+ max = value.get();
+ }
+ } else {
+ collector.collect(pair, new DoubleWritable(max));
+ collector.collect(dummyPair, dummyVal);
+ }
+ }
+
+ }
+
+ public static class PivotReducer extends MapReduceBase implements
+ Reducer<Pair, DoubleWritable, Pair, DoubleWritable> {
+
+ private double max = 0;
+ private Pair pair = new Pair(0, 0);
+
+ @Override
+ public void reduce(Pair key, Iterator<DoubleWritable> values,
+ OutputCollector<Pair, DoubleWritable> collector, Reporter reporter)
+ throws IOException {
+ double val;
+ if (key.getRow() != Integer.MAX_VALUE) {
+ val = values.next().get();
+ if (Math.abs(val) > Math.abs(max)) {
+ pair.set(key.getRow(), key.getColumn());
+ max = val;
+ }
+ } else {
+ collector.collect(pair, new DoubleWritable(max));
+ }
+ }
+
+ }
+
+ /**
+ * Tricky here! we rotation the matrix during we scan the matrix and update to
+ * the matrix so we just need a rotationrecordreader to scan the matrix and do
+ * the rotation the mapper&reducer just a dummy mapper
+ */
+ public static class RotationInputFormat extends HTableInputFormatBase
+ implements InputFormat<NullWritable, NullWritable>, JobConfigurable {
+
+ private RotationRecordReader tableRecordReader;
+
+ int pivot_row, pivot_col;
+ double pivot_cos, pivot_sin;
+
+ public void configure(JobConf job) {
+ super.configure(job);
+ pivot_row = job.getInt(PIVOTROW, -1);
+ pivot_col = job.getInt(PIVOTCOL, -1);
+ pivot_sin = Double.parseDouble(job.get(PIVOTSIN));
+ pivot_cos = Double.parseDouble(job.get(PIVOTCOS));
+ }
+
+ protected static class RotationRecordReader extends HTableRecordReaderBase
+ implements RecordReader<NullWritable, NullWritable> {
+
+ private int totalRows;
+ private int processedRows;
+ int startRowId, endRowId = -1;
+ int size;
+
+ int pivotrow, pivotcol;
+ byte[] prow, pcol;
+ double pivotcos, pivotsin;
+
+ public RotationRecordReader(int pr, int pc, double psin, double pcos) {
+ super();
+ pivotrow = pr;
+ pivotcol = pc;
+ pivotsin = psin;
+ pivotcos = pcos;
+ prow = Bytes.toBytes(pivotrow);
+ pcol = Bytes.toBytes(pivotcol);
+ LOG.info(prow);
+ LOG.info(pcol);
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ Cell rows = null;
+ rows = htable.get(Constants.METADATA, Constants.METADATA_ROWS);
+ size = (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+
+ if (endRow.length == 0) { // the last split, we don't know the end row
+ totalRows = 0; // so we just skip it.
+ if (startRow.length == 0)
+ startRowId = 0;
+ else
+ startRowId = BytesUtil.bytesToInt(startRow);
+ endRowId = -1;
+ } else {
+ if (startRow.length == 0) { // the first split, start row is 0
+ totalRows = BytesUtil.bytesToInt(endRow);
+ startRowId = 0;
+ endRowId = totalRows;
+ } else {
+ startRowId = BytesUtil.bytesToInt(startRow);
+ endRowId = BytesUtil.bytesToInt(endRow);
+ totalRows = startRowId - endRowId;
+ }
+ }
+ processedRows = 0;
+ LOG
+ .info("Split (" + startRowId + ", " + endRowId + ") -> "
+ + totalRows);
+ }
+
+ /**
+ * @return NullWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ /**
+ * @return NullWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public NullWritable createValue() {
+ return NullWritable.get();
+ }
+
+ /**
+ * @param key NullWritable as input key.
+ * @param value NullWritable as input value
+ *
+ * Converts Scanner.next() to NullWritable, NullWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(NullWritable key, NullWritable value)
+ throws IOException {
+ RowResult result;
+ try {
+ result = this.scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ }
+
+ double s1, s2;
+ VectorUpdate bu;
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ byte[] row = result.getRow();
+ int rowId = BytesUtil.bytesToInt(row);
+ if (rowId < pivotrow) {
+ s1 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(rowId),
+ Bytes.toBytes(JacobiEigenValue.EICOL + pivotrow)).getValue());
+ s2 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(rowId),
+ Bytes.toBytes(JacobiEigenValue.EICOL + pivotcol)).getValue());
+
+ bu = new VectorUpdate(rowId);
+ bu.put(EICOL, pivotrow, pivotcos * s1 - pivotsin * s2);
+ bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+
+ htable.commit(bu.getBatchUpdate());
+ } else if (rowId == pivotrow) {
+ return true;
+ } else if (rowId < pivotcol) {
+ s1 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + rowId))
+ .getValue());
+ s2 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(rowId), Bytes.toBytes(EICOL + pivotcol))
+ .getValue());
+
+ bu = new VectorUpdate(rowId);
+ bu.put(EICOL, pivotcol, pivotsin * s1 + pivotcos * s2);
+ htable.commit(bu.getBatchUpdate());
+
+ bu = new VectorUpdate(pivotrow);
+ bu.put(EICOL, rowId, pivotcos * s1 - pivotsin * s2);
+ htable.commit(bu.getBatchUpdate());
+ } else if (rowId == pivotcol) {
+ for (int i = pivotcol + 1; i < size; i++) {
+ s1 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(pivotrow), Bytes.toBytes(EICOL + i))
+ .getValue());
+ s2 = BytesUtil.bytesToDouble(htable.get(
+ BytesUtil.getRowIndex(pivotcol), Bytes.toBytes(EICOL + i))
+ .getValue());
+
+ bu = new VectorUpdate(pivotcol);
+ bu.put(EICOL, i, pivotsin * s1 + pivotcos * s2);
+ htable.commit(bu.getBatchUpdate());
+
+ bu = new VectorUpdate(pivotrow);
+ bu.put(EICOL, i, pivotcos * s1 - pivotsin * s2);
+ htable.commit(bu.getBatchUpdate());
+ }
+ } else { // rowId > pivotcol
+ return false;
+ }
+
+ lastRow = row;
+ processedRows++;
+ }
+ return hasMore;
+ }
+
+ @Override
+ public float getProgress() {
+ if (totalRows <= 0) {
+ return 0;
+ } else {
+ return Math.min(1.0f, processedRows / (float) totalRows);
+ }
+ }
+
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ InputSplit[] splits = super.getSplits(job, numSplits);
+ List<InputSplit> newSplits = new ArrayList<InputSplit>();
+ for (InputSplit split : splits) {
+ TableSplit ts = (TableSplit) split;
+ byte[] row = ts.getStartRow();
+ if (row.length == 0) // the first split
+ newSplits.add(split);
+ else {
+ if (BytesUtil.bytesToInt(ts.getStartRow()) < pivot_col) {
+ newSplits.add(split);
+ }
+ }
+ }
+
+ return newSplits.toArray(new InputSplit[newSplits.size()]);
+ }
+
+ @Override
+ public RecordReader<NullWritable, NullWritable> getRecordReader(
+ InputSplit split, JobConf conf, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ RotationRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new RotationRecordReader(pivot_row, pivot_col, pivot_sin,
+ pivot_cos);
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ protected void setTableRecordReader(RotationRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ }
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMap.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,70 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.mapred.VectorInputFormat;
+
+public class MatrixNormMap extends MapReduceBase implements
+ Mapper<IntWritable, MapWritable, IntWritable, DoubleWritable> {
+ private IntWritable nKey = new IntWritable(-1);
+ private DoubleWritable nValue = new DoubleWritable();
+
+ public static void initJob(String path, Class<MatrixNormMap> map,
+ Class<IntWritable> outputKeyClass, Class<DoubleWritable> outputValueClass,
+ JobConf jobConf) {
+ jobConf.setMapOutputValueClass(outputValueClass);
+ jobConf.setMapOutputKeyClass(outputKeyClass);
+ jobConf.setMapperClass(map);
+
+ jobConf.setInputFormat(VectorInputFormat.class);
+ FileInputFormat.addInputPaths(jobConf, path);
+ jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ }
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+
+ double rowSum = 0;
+ for(Map.Entry<Writable, Writable> e : value.entrySet()) {
+ rowSum += Math.abs(((DoubleEntry) e.getValue()).getValue());
+ }
+ nValue.set(rowSum);
+
+ output.collect(nKey, nValue);
+ }
+
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormMapRed.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.mapred.VectorInputFormat;
+
+/** A Catalog class collect all the mr classes to compute the matrix's norm */
+public class MatrixNormMapRed {
+
+ /**
+ * Initialize the job to compute the matrix's norm
+ *
+ * @param inputMatrixPath the input matrix's path
+ * @param outputPath the output file's name that records the norm of the
+ * matrix
+ * @param mapper Mapper
+ * @param combiner Combiner
+ * @param reducer Reducer
+ * @param jobConf Configuration of the job
+ */
+ public static void initJob(String inputMatrixPath, String outputPath,
+ Class<? extends MatrixNormMapper> mapper,
+ Class<? extends MatrixNormReducer> combiner,
+ Class<? extends MatrixNormReducer> reducer, JobConf jobConf) {
+ jobConf.setMapperClass(mapper);
+ jobConf.setMapOutputKeyClass(IntWritable.class);
+ jobConf.setMapOutputValueClass(DoubleWritable.class);
+ jobConf.setCombinerClass(combiner);
+ jobConf.setReducerClass(reducer);
+ jobConf.setOutputKeyClass(IntWritable.class);
+ jobConf.setOutputValueClass(DoubleWritable.class);
+
+ // input
+ jobConf.setInputFormat(VectorInputFormat.class);
+ jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ FileInputFormat.addInputPaths(jobConf, inputMatrixPath);
+ // output
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+ }
+
+ /** the interface of norm mapper */
+ public static interface MatrixNormMapper extends
+ Mapper<IntWritable, MapWritable, IntWritable, DoubleWritable> {
+ IntWritable nKey = new IntWritable(-1);
+ DoubleWritable nValue = new DoubleWritable(0);
+ }
+
+ /** the interface of norm reducer/combiner */
+ public static interface MatrixNormReducer extends
+ Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+ IntWritable nKey = new IntWritable(-1);
+ DoubleWritable nValue = new DoubleWritable(0);
+ }
+
+ // /
+ // / Infinity Norm
+ // /
+
+ /** Infinity Norm */
+ public static class MatrixInfinityNormMapper extends MapReduceBase implements
+ MatrixNormMapper {
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+
+ double rowSum = 0;
+ for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+ rowSum += Math.abs(((DoubleEntry) e.getValue()).getValue());
+ }
+ nValue.set(rowSum);
+
+ output.collect(nKey, nValue);
+ }
+
+ }
+
+ /**
+ * Matrix Infinity Norm Reducer
+ */
+ public static class MatrixInfinityNormReducer extends MapReduceBase implements
+ MatrixNormReducer {
+
+ private double max = 0;
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+
+ while (values.hasNext()) {
+ max = Math.max(values.next().get(), max);
+ }
+
+ // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+ // the reduce will just iterate through one key (-1)
+ // so we collect the max sum-value here
+ nValue.set(max);
+ output.collect(nKey, nValue);
+ }
+
+ }
+
+ // /
+ // / One Norm
+ // /
+
+ /** One Norm Mapper */
+ public static class MatrixOneNormMapper extends MapReduceBase implements
+ MatrixNormMapper {
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+
+ for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+ nValue.set(((DoubleEntry) e.getValue()).getValue());
+ output.collect((IntWritable) e.getKey(), nValue);
+ }
+ }
+ }
+
+ /** One Norm Combiner * */
+ public static class MatrixOneNormCombiner extends MapReduceBase implements
+ MatrixNormReducer {
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+
+ double partialColSum = 0;
+ while (values.hasNext()) {
+ partialColSum += values.next().get();
+ }
+ nValue.set(partialColSum);
+ output.collect(key, nValue);
+ }
+ }
+
+ /** One Norm Reducer * */
+ public static class MatrixOneNormReducer extends MapReduceBase implements
+ MatrixNormReducer {
+ private double max = 0;
+ private Path outDir;
+ private JobConf conf;
+
+ @Override
+ public void configure(JobConf job) {
+ outDir = FileOutputFormat.getOutputPath(job);
+ conf = job;
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+ double colSum = 0;
+ while (values.hasNext()) {
+ colSum += values.next().get();
+ }
+
+ max = Math.max(Math.abs(colSum), max);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // write output to a file
+ Path outFile = new Path(outDir, "reduce-out");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, IntWritable.class, DoubleWritable.class,
+ CompressionType.NONE);
+ writer.append(new IntWritable(-1), new DoubleWritable(max));
+ writer.close();
+ }
+ }
+
+ // /
+ // / Frobenius Norm
+ // /
+
+ /** Frobenius Norm Mapper */
+ public static class MatrixFrobeniusNormMapper extends MapReduceBase implements
+ MatrixNormMapper {
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+ double rowSqrtSum = 0;
+ for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+ double cellValue = ((DoubleEntry) e.getValue()).getValue();
+ rowSqrtSum += (cellValue * cellValue);
+ }
+
+ nValue.set(rowSqrtSum);
+ output.collect(nKey, nValue);
+ }
+ }
+
+ /** Frobenius Norm Combiner */
+ public static class MatrixFrobeniusNormCombiner extends MapReduceBase
+ implements MatrixNormReducer {
+ private double sqrtSum = 0;
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+ while (values.hasNext()) {
+ sqrtSum += values.next().get();
+ }
+ // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+ // the reduce will just iterate through one key (-1)
+ // so we collect the max sum-value here
+ nValue.set(sqrtSum);
+ output.collect(nKey, nValue);
+ }
+ }
+
+ /** Frobenius Norm Reducer */
+ public static class MatrixFrobeniusNormReducer extends MapReduceBase
+ implements MatrixNormReducer {
+ private double sqrtSum = 0;
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+ while (values.hasNext()) {
+ sqrtSum += values.next().get();
+ }
+
+ // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+ // the reduce will just iterate through one key (-1)
+ // so we collect the max sum-value here
+ nValue.set(Math.sqrt(sqrtSum));
+ output.collect(nKey, nValue);
+ }
+ }
+
+ // /
+ // / MaxValue Norm
+ // /
+
+ /** MaxValue Norm Mapper * */
+ public static class MatrixMaxValueNormMapper extends MapReduceBase implements
+ MatrixNormMapper {
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+ double max = 0;
+ for (Map.Entry<Writable, Writable> e : value.entrySet()) {
+ double cellValue = Math.abs(((DoubleEntry) e.getValue()).getValue());
+ max = cellValue > max ? cellValue : max;
+ }
+
+ nValue.set(max);
+ output.collect(nKey, nValue);
+ }
+
+ }
+
+ /** MaxValue Norm Reducer */
+ public static class MatrixMaxValueNormReducer extends MapReduceBase implements
+ MatrixNormReducer {
+ private double max = 0;
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+ while (values.hasNext()) {
+ max = Math.max(values.next().get(), max);
+ }
+
+ // Note: Tricky here. As we known, we collect each row's sum with key(-1).
+ // the reduce will just iterate through one key (-1)
+ // so we collect the max sum-value here
+ nValue.set(max);
+ output.collect(nKey, nValue);
+ }
+ }
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/MatrixNormReduce.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class MatrixNormReduce extends MapReduceBase implements
+ Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+ private double max = 0;
+ private String outDir = "";
+ private JobConf conf;
+ private static final String OUTPUT = "hama.multiplication.matrix.a";
+
+ public void configure(JobConf job) {
+ outDir = job.get(OUTPUT, "");
+ conf = job;
+ }
+
+ public static void initJob(String path, Class<MatrixNormReduce> reducer,
+ JobConf jobConf) {
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ jobConf.setReducerClass(reducer);
+ jobConf.setOutputKeyClass(IntWritable.class);
+ jobConf.setOutputValueClass(DoubleWritable.class);
+ jobConf.set(OUTPUT, path);
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+ OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+ throws IOException {
+
+ while (values.hasNext()) {
+ max = Math.max(values.next().get(), max);
+ }
+
+ }
+
+ /**
+ * Reduce task done, Writes the largest element of the passed array
+ */
+ @Override
+ public void close() throws IOException {
+ // write output to a file
+ Path outFile = new Path(outDir, "reduce-out");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, IntWritable.class, DoubleWritable.class, CompressionType.NONE);
+ writer.append(new IntWritable(-1), new DoubleWritable(max));
+ writer.close();
+ }
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.log4j.Logger;
+
+public class RowCyclicAdditionMap extends MapReduceBase implements
+Mapper<IntWritable, MapWritable, IntWritable, MapWritable> {
+ static final Logger LOG = Logger.getLogger(RowCyclicAdditionMap.class);
+ protected DenseMatrix[] matrix_summands;
+ protected double[] matrix_alphas;
+ public static final String MATRIX_SUMMANDS = "hama.addition.summands";
+ public static final String MATRIX_ALPHAS = "hama.addition.alphas";
+
+ public void configure(JobConf job) {
+ try {
+ String[] matrix_names = job.get(MATRIX_SUMMANDS, "").split(",");
+ String[] matrix_alpha_strs = job.get(MATRIX_ALPHAS, "").split(",");
+ assert(matrix_names.length == matrix_alpha_strs.length && matrix_names.length >= 1);
+
+ matrix_summands = new DenseMatrix[matrix_names.length];
+ matrix_alphas = new double[matrix_names.length];
+ for(int i=0; i<matrix_names.length; i++) {
+ matrix_summands[i] = new DenseMatrix(new HamaConfiguration(job), matrix_names[i]);
+ matrix_alphas[i] = Double.valueOf(matrix_alpha_strs[i]);
+ }
+ } catch (IOException e) {
+ LOG.warn("Load matrix_b failed : " + e.getMessage());
+ }
+ }
+
+ public static void initJob(String matrix_a, String matrix_summandlist,
+ String matrix_alphalist, Class<RowCyclicAdditionMap> map,
+ Class<IntWritable> outputKeyClass, Class<MapWritable> outputValueClass,
+ JobConf jobConf) {
+
+ jobConf.setMapOutputValueClass(outputValueClass);
+ jobConf.setMapOutputKeyClass(outputKeyClass);
+ jobConf.setMapperClass(map);
+ jobConf.set(MATRIX_SUMMANDS, matrix_summandlist);
+ jobConf.set(MATRIX_ALPHAS, matrix_alphalist);
+
+ jobConf.setInputFormat(VectorInputFormat.class);
+ FileInputFormat.addInputPaths(jobConf, matrix_a);
+ jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ }
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
+ throws IOException {
+
+ DenseVector result = new DenseVector(value);
+ DenseVector summand;
+ for(int i=0; i<matrix_summands.length; i++) {
+ summand = matrix_summands[i].getRow(key.get());
+ result = result.add(matrix_alphas[i], summand);
+ }
+ output.collect(key, result.getEntries());
+
+ }
+}
Added: incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java?rev=813233&view=auto
==============================================================================
--- incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java (added)
+++ incubator/hama/branches/hama-0.19/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java Thu Sep 10 05:32:52 2009
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+
+public class RowCyclicAdditionReduce extends MapReduceBase implements
+ Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
+
+ /**
+ * Use this before submitting a TableReduce job. It will appropriately set up
+ * the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table, Class<RowCyclicAdditionReduce> reducer,
+ JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<MapWritable> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException {
+
+ VectorUpdate update = new VectorUpdate(key.get());
+ update.putAll(values.next());
+
+ output.collect(key, update);
+ }
+
+}