You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/07/29 11:33:20 UTC
svn commit: r680652 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/
src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/
src/test/org/apache/hama/ src/test/org/apache/hama/mapred/
Author: edwardyoon
Date: Tue Jul 29 02:33:18 2008
New Revision: 680652
URL: http://svn.apache.org/viewvc?rev=680652&view=rev
Log:
https://issues.apache.org/jira/browse/HAMA-10
Added:
incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java
incubator/hama/trunk/src/java/org/apache/hama/Constants.java
incubator/hama/trunk/src/java/org/apache/hama/Vector.java
incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java
incubator/hama/trunk/src/java/org/apache/hama/io/
incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
incubator/hama/trunk/src/test/org/apache/hama/TestVector.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/HamaConstants.java
incubator/hama/trunk/src/test/org/apache/hama/TestFeatureVector.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/Matrix.java
incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Jul 29 02:33:18 2008
@@ -2,6 +2,8 @@
Trunk (unreleased changes)
+ HAMA-10: Refactor the mapred package for the latest version of dependencies (edwardyoon)
+ HAMA-9: Upgrade dependencies (edwardyoon)
HAMA-6: Add a 'who we are' page (edwardyoon)
HAMA-7: Add some information for a new comer (edwardyoon)
HAMA-1: Create the Hama web site (edwardyoon via Ian Holsman)
Added: incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractBase.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,15 @@
+package org.apache.hama;
+
+public abstract class AbstractBase {
+ /**
+ * Return the integer column index
+ *
+ * @param b key
+ * @return integer
+ */
+ public int getColumnIndex(byte[] b) {
+ String cKey = new String(b);
+ return Integer.parseInt(cKey
+ .substring(cKey.indexOf(":") + 1, cKey.length()));
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Tue Jul 29 02:33:18 2008
@@ -38,7 +38,7 @@
/**
* Methods of the matrix classes
*/
-public abstract class AbstractMatrix implements MatrixInterface {
+public abstract class AbstractMatrix extends AbstractBase implements MatrixInterface {
static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
/** Hbase Configuration */
@@ -77,7 +77,7 @@
*/
protected void create() {
try {
- tableDesc.addFamily(new HColumnDescriptor(HamaConstants.METADATA
+ tableDesc.addFamily(new HColumnDescriptor(Constants.METADATA
.toString()));
LOG.info("Initializaing.");
admin.createTable(tableDesc);
@@ -90,7 +90,7 @@
public int getRowDimension() {
Cell rows = null;
try {
- rows = table.get(HamaConstants.METADATA, HamaConstants.METADATA_ROWS);
+ rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
} catch (IOException e) {
LOG.error(e, e);
}
@@ -102,8 +102,8 @@
public int getColumnDimension() {
Cell columns = null;
try {
- columns = table.get(HamaConstants.METADATA,
- HamaConstants.METADATA_COLUMNS);
+ columns = table.get(Constants.METADATA,
+ Constants.METADATA_COLUMNS);
} catch (IOException e) {
LOG.error(e, e);
}
@@ -113,7 +113,7 @@
/** {@inheritDoc} */
public double get(int i, int j) {
Text row = new Text(String.valueOf(i));
- Text column = new Text(HamaConstants.COLUMN + String.valueOf(j));
+ Text column = new Text(Constants.COLUMN + String.valueOf(j));
Cell c;
double result = -1;
try {
@@ -157,7 +157,7 @@
/** {@inheritDoc} */
public void set(int i, int j, double d) {
BatchUpdate b = new BatchUpdate(new Text(String.valueOf(i)));
- b.put(new Text(HamaConstants.COLUMN + String.valueOf(j)), doubleToBytes(d));
+ b.put(new Text(Constants.COLUMN + String.valueOf(j)), doubleToBytes(d));
try {
table.commit(b);
} catch (IOException e) {
@@ -187,9 +187,9 @@
/** {@inheritDoc} */
public void setDimension(int rows, int columns) {
- BatchUpdate b = new BatchUpdate(HamaConstants.METADATA);
- b.put(HamaConstants.METADATA_ROWS, Bytes.toBytes(rows));
- b.put(HamaConstants.METADATA_COLUMNS, Bytes.toBytes(columns));
+ BatchUpdate b = new BatchUpdate(Constants.METADATA);
+ b.put(Constants.METADATA_ROWS, Bytes.toBytes(rows));
+ b.put(Constants.METADATA_COLUMNS, Bytes.toBytes(columns));
try {
table.commit(b);
@@ -211,8 +211,8 @@
public double getDeterminant() {
try {
return bytesToDouble(table.get(
- new Text(String.valueOf(HamaConstants.DETERMINANT)),
- new Text(HamaConstants.COLUMN)).getValue());
+ new Text(String.valueOf(Constants.DETERMINANT)),
+ new Text(Constants.COLUMN)).getValue());
} catch (IOException e) {
LOG.error(e, e);
return -1;
@@ -245,16 +245,4 @@
LOG.error(e, e);
}
}
-
- /**
- * Return the integer column index
- *
- * @param b key
- * @return integer
- */
- public int getColumnIndex(byte[] b) {
- String cKey = new String(b);
- return Integer.parseInt(cKey
- .substring(cKey.indexOf(":") + 1, cKey.length()));
- }
}
Added: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Some constants used in the hama
+ */
+public class Constants {
+ /** Meta-columnfamily to store the matrix-info */
+ public final static Text METADATA = new Text("metadata:");
+ /** The number of the matrix rows */
+ public final static Text METADATA_ROWS = new Text("metadata:rows");
+ /** The number of the matrix columns */
+ public final static Text METADATA_COLUMNS = new Text("metadata:columns");
+ /** The type of the matrix */
+ public final static Text METADATA_TYPE = new Text("metadata:type");
+
+ /** plus operator */
+ public final static String PLUS = "+";
+ /** minus operator */
+ public final static String MINUS = "-";
+
+ /** Default columnfamily name */
+ public final static Text COLUMN = new Text("column:");
+ /** The numerator version of the fraction matrix */
+ public final static Text NUMERATOR = new Text("numerator:");
+ /** The denominator version of the fration matrix */
+ public final static Text DENOMINATOR = new Text("denominator:");
+ /** The original version of the fraction matrix */
+ public final static Text ORIGINAL = new Text("original:");
+ /** The lower matrix version of the triangular matrix */
+ public final static Text LOWER = new Text("lower:");
+ /** The upper matrix version of the triangular matrix */
+ public final static Text UPPER = new Text("upper:");
+
+ /** A determinant value record */
+ public final static Text DETERMINANT = new Text("determinant");
+
+ /** Temporary random matices name-head */
+ public final static String RANDOM = "rand";
+ /** Temporary result matices name-head */
+ public final static String RESULT = "result";
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/Matrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Matrix.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Matrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Matrix.java Tue Jul 29 02:33:18 2008
@@ -55,7 +55,7 @@
if (!admin.tableExists(matrixName)) {
tableDesc = new HTableDescriptor(matrixName.toString());
- tableDesc.addFamily(new HColumnDescriptor(HamaConstants.COLUMN
+ tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN
.toString()));
create();
}
@@ -82,7 +82,7 @@
if (!admin.tableExists(matrixName)) {
tableDesc = new HTableDescriptor(matrixName.toString());
- tableDesc.addFamily(new HColumnDescriptor(HamaConstants.COLUMN
+ tableDesc.addFamily(new HColumnDescriptor(Constants.COLUMN
.toString()));
create();
}
@@ -154,12 +154,12 @@
/** {@inheritDoc} */
public Matrix addition(Matrix b) {
- return additionSubtraction(b, HamaConstants.PLUS);
+ return additionSubtraction(b, Constants.PLUS);
}
/** {@inheritDoc} */
public Matrix subtraction(Matrix b) {
- return additionSubtraction(b, HamaConstants.PLUS);
+ return additionSubtraction(b, Constants.PLUS);
}
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/RandomVariable.java Tue Jul 29 02:33:18 2008
@@ -56,7 +56,7 @@
* @return random name
*/
protected static Text randMatrixName() {
- String rName = HamaConstants.RANDOM;
+ String rName = Constants.RANDOM;
for (int i = 1; i <= 5; i++) {
char ch = (char) ((Math.random() * 26) + 97);
rName += ch;
Added: incubator/hama/trunk/src/java/org/apache/hama/Vector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Vector.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Vector.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/Vector.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,127 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.io.VectorDatum;
+import org.apache.log4j.Logger;
+
+public class Vector extends AbstractBase implements VectorInterface {
+ static final Logger LOG = Logger.getLogger(Vector.class);
+ protected int[] m_dims;
+ protected double[] m_vals;
+
+ public Vector(RowResult r) {
+ parse(r.entrySet());
+ }
+
+ public Vector(VectorDatum r) {
+ parse(r.entrySet());
+ }
+
+ private void parse(Set<Entry<byte[], Cell>> entrySet) {
+ SortedMap<Integer, Double> m = new TreeMap<Integer, Double>();
+ for (Map.Entry<byte[], Cell> f : entrySet) {
+ m.put(getColumnIndex(f.getKey()), Double.parseDouble(Bytes.toString(f
+ .getValue().getValue())));
+ }
+
+ this.m_dims = new int[m.keySet().size()];
+ this.m_vals = new double[m.keySet().size()];
+
+ int i = 0;
+ for (Map.Entry<Integer, Double> f : m.entrySet()) {
+ this.m_dims[i] = f.getKey();
+ this.m_vals[i] = f.getValue();
+ i++;
+ }
+ }
+
+ /**
+ * Returns the cosine similarity between two feature vectors.
+ */
+ public double getCosine(Vector v) {
+ double cosine = 0.0;
+ int dim;
+ double q_i, d_i;
+ for (int i = 0; i < Math.min(this.size(), v.size()); i++) {
+ dim = v.getDimAt(i);
+ q_i = v.getValueAt(dim);
+ d_i = this.getValueAt(dim);
+ cosine += q_i * d_i;
+ }
+ return cosine / (this.getL2Norm() * v.getL2Norm());
+ }
+
+ /**
+ * Returns the linear norm factor of this vector's values (i.e., the sum of
+ * it's values).
+ */
+ public double getL1Norm() {
+ double sum = 0.0;
+ for (int i = 0; i < m_vals.length; i++) {
+ sum += m_vals[i];
+ }
+ return sum;
+ }
+
+ /**
+ * Returns the L2 norm factor of this vector's values.
+ */
+ public double getL2Norm() {
+ double square_sum = 0.0;
+ for (int i = 0; i < m_vals.length; i++) {
+ square_sum += (m_vals[i] * m_vals[i]);
+ }
+ return Math.sqrt(square_sum);
+ }
+
+ public int getDimAt(int index) {
+ return m_dims[index];
+ }
+
+ public double getValueAt(int index) {
+ return m_vals[index];
+ }
+
+ public int size() {
+ return m_dims.length;
+ }
+
+ public VectorDatum addition(byte[] bs, Vector v2) {
+ HbaseMapWritable<byte[], Cell> trunk = new HbaseMapWritable<byte[], Cell>();
+ for (int i = 0; i < this.size(); i++) {
+ double value = (this.getValueAt(i) + v2.getValueAt(i));
+ Cell cValue = new Cell(String.valueOf(value), 0);
+ trunk.put(Bytes.toBytes("column:" + i), cValue);
+ }
+
+ return new VectorDatum(bs, trunk);
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/VectorInterface.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,30 @@
+package org.apache.hama;
+
+import org.apache.hama.io.VectorDatum;
+
+/**
+ * A vector. Features are dimension-value pairs. This class implements a
+ * simple dictionary data structure to map dimensions onto their values. Note
+ * that for convenience, features do not have be sorted according to their
+ * dimensions at this point. The SVMLightTrainer class has an option for sorting
+ * input vectors prior to training.
+ */
+public interface VectorInterface {
+
+ public double getValueAt(int index);
+
+ public int getDimAt(int index);
+
+ public int size();
+
+ public double getL1Norm();
+
+ public double getL2Norm();
+
+ public double getCosine(Vector v);
+
+ public VectorDatum addition(byte[] bs, Vector v2);
+
+ // TODO: save, copy,...,etc
+
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorDatum.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,193 @@
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Vector;
+
+public class VectorDatum implements Writable, Map<byte[], Cell> {
+ private byte[] row = null;
+ private final HbaseMapWritable<byte[], Cell> cells;
+
+ public VectorDatum() {
+ this(null, new HbaseMapWritable<byte[], Cell>());
+ }
+
+ /**
+ * Create a RowResult from a row and Cell map
+ */
+ public VectorDatum(final byte[] row, final HbaseMapWritable<byte[], Cell> m) {
+ this.row = row;
+ this.cells = m;
+ }
+
+ /**
+ * Get the row for this RowResult
+ */
+ public byte[] getRow() {
+ return row;
+ }
+
+ public Cell put(@SuppressWarnings("unused")
+ byte[] key, @SuppressWarnings("unused")
+ Cell value) {
+ throw new UnsupportedOperationException("VectorDatum is read-only!");
+ }
+
+ @SuppressWarnings("unchecked")
+ public void putAll(@SuppressWarnings("unused")
+ Map map) {
+ throw new UnsupportedOperationException("VectorDatum is read-only!");
+ }
+
+ public Cell get(Object key) {
+ return (Cell) this.cells.get(key);
+ }
+
+ public Cell remove(@SuppressWarnings("unused")
+ Object key) {
+ throw new UnsupportedOperationException("VectorDatum is read-only!");
+ }
+
+ public boolean containsKey(Object key) {
+ return cells.containsKey(key);
+ }
+
+ public boolean containsValue(@SuppressWarnings("unused")
+ Object value) {
+ throw new UnsupportedOperationException("Don't support containsValue!");
+ }
+
+ public boolean isEmpty() {
+ return cells.isEmpty();
+ }
+
+ public int size() {
+ return cells.size();
+ }
+
+ public void clear() {
+ throw new UnsupportedOperationException("VectorDatum is read-only!");
+ }
+
+ public Set<byte[]> keySet() {
+ Set<byte[]> result = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for (byte[] w : cells.keySet()) {
+ result.add(w);
+ }
+ return result;
+ }
+
+ public Set<Map.Entry<byte[], Cell>> entrySet() {
+ return Collections.unmodifiableSet(this.cells.entrySet());
+ }
+
+ public Collection<Cell> values() {
+ ArrayList<Cell> result = new ArrayList<Cell>();
+ for (Writable w : cells.values()) {
+ result.add((Cell) w);
+ }
+ return result;
+ }
+
+ /**
+ * Get the Cell that corresponds to column
+ */
+ public Cell get(byte[] column) {
+ return this.cells.get(column);
+ }
+
+ /**
+ * Get the Cell that corresponds to column, using a String key
+ */
+ public Cell get(String key) {
+ return get(Bytes.toBytes(key));
+ }
+
+ /**
+ * Row entry.
+ */
+ public class Entry implements Map.Entry<byte[], Cell> {
+ private final byte[] column;
+ private final Cell cell;
+
+ Entry(byte[] row, Cell cell) {
+ this.column = row;
+ this.cell = cell;
+ }
+
+ public Cell setValue(@SuppressWarnings("unused")
+ Cell c) {
+ throw new UnsupportedOperationException("VectorDatum is read-only!");
+ }
+
+ public byte[] getKey() {
+ return column;
+ }
+
+ public Cell getValue() {
+ return cell;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("row=");
+ sb.append(Bytes.toString(this.row));
+ sb.append(", cells={");
+ boolean moreThanOne = false;
+ for (Map.Entry<byte[], Cell> e : this.cells.entrySet()) {
+ if (moreThanOne) {
+ sb.append(", ");
+ } else {
+ moreThanOne = true;
+ }
+ sb.append("(column=");
+ sb.append(Bytes.toString(e.getKey()));
+ sb.append(", timestamp=");
+ sb.append(Long.toString(e.getValue().getTimestamp()));
+ sb.append(", value=");
+ byte[] v = e.getValue().getValue();
+ if (Bytes.equals(e.getKey(), HConstants.COL_REGIONINFO)) {
+ try {
+ sb.append(Writables.getHRegionInfo(v).toString());
+ } catch (IOException ioe) {
+ sb.append(ioe.toString());
+ }
+ } else {
+ sb.append(v);
+ }
+ sb.append(")");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ public void readFields(final DataInput in) throws IOException {
+ this.row = Bytes.readByteArray(in);
+ this.cells.readFields(in);
+ }
+
+ public void write(final DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, this.row);
+ this.cells.write(out);
+ }
+
+ public Vector getVector() {
+ return new Vector(this);
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,58 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+
+public class MatrixInputFormat extends MatrixInputFormatBase implements
+JobConfigurable {
+private final Log LOG = LogFactory.getLog(MatrixInputFormat.class);
+
+/**
+* space delimited list of columns
+*
+* @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name
+* wildcards
+*/
+public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+
+/** {@inheritDoc} */
+public void configure(JobConf job) {
+Path[] tableNames = FileInputFormat.getInputPaths(job);
+String colArg = job.get(COLUMN_LIST);
+String[] colNames = colArg.split(" ");
+byte [][] m_cols = new byte[colNames.length][];
+for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+}
+setInputColums(m_cols);
+try {
+ setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+} catch (Exception e) {
+ LOG.error(e);
+}
+}
+
+/** {@inheritDoc} */
+public void validateInput(JobConf job) throws IOException {
+// expecting exactly one path
+Path [] tableNames = FileInputFormat.getInputPaths(job);
+if (tableNames == null || tableNames.length > 1) {
+ throw new IOException("expecting one table name");
+}
+
+// expecting at least one column
+String colArg = job.get(COLUMN_LIST);
+if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+}
+}
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,275 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorDatum;
+
+public abstract class MatrixInputFormatBase
+implements InputFormat<ImmutableBytesWritable, VectorDatum> {
+ private final Log LOG = LogFactory.getLog(MatrixInputFormatBase.class);
+ private byte [][] inputColumns;
+ private HTable table;
+ private TableRecordReader tableRecordReader;
+ private RowFilterInterface rowFilter;
+
+ /**
+ * Iterate over an HBase table data, return (Text, VectorResult) pairs
+ */
+ protected class TableRecordReader
+ implements RecordReader<ImmutableBytesWritable, VectorDatum> {
+ private byte [] startRow;
+ private byte [] endRow;
+ private RowFilterInterface trrRowFilter;
+ private Scanner scanner;
+ private HTable htable;
+ private byte [][] trrInputColumns;
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ final Set<RowFilterInterface> rowFiltersSet =
+ new HashSet<RowFilterInterface>();
+ rowFiltersSet.add(new StopRowFilter(endRow));
+ rowFiltersSet.add(trrRowFilter);
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+ rowFiltersSet));
+ } else {
+ this.scanner =
+ this.htable.getScanner(trrInputColumns, startRow, endRow);
+ }
+ } else {
+ this.scanner =
+ this.htable.getScanner(trrInputColumns, startRow, trrRowFilter);
+ }
+ }
+
+ /**
+ * @param htable the {@link HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns the columns to be placed in {@link VectorDatum}.
+ */
+ public void setInputColumns(final byte [][] inputColumns) {
+ this.trrInputColumns = inputColumns;
+ }
+
+ /**
+ * @param startRow the first row in the split
+ */
+ public void setStartRow(final byte [] startRow) {
+ this.startRow = startRow;
+ }
+
+ /**
+ *
+ * @param endRow the last row in the split
+ */
+ public void setEndRow(final byte [] endRow) {
+ this.endRow = endRow;
+ }
+
+ /**
+ * @param rowFilter the {@link RowFilterInterface} to be used.
+ */
+ public void setRowFilter(RowFilterInterface rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ this.scanner.close();
+ }
+
+ /**
+ * @return ImmutableBytesWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
+ }
+
+ /**
+ * @return VectorResult
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public VectorDatum createValue() {
+ return new VectorDatum();
+ }
+
+ /** {@inheritDoc} */
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key HStoreKey as input key.
+ * @param value MapWritable as input value
+ *
+ * Converts Scanner.next() to Text, VectorResult
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public boolean next(ImmutableBytesWritable key, VectorDatum value)
+ throws IOException {
+ RowResult result = this.scanner.next();
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ key.set(result.getRow());
+ Writables.copyWritable(result, value);
+ }
+ return hasMore;
+ }
+ }
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses
+ * the default.
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+ * JobConf, Reporter)
+ */
+ public RecordReader<ImmutableBytesWritable, VectorDatum> getRecordReader(InputSplit split,
+ @SuppressWarnings("unused")
+ JobConf job, @SuppressWarnings("unused")
+ Reporter reporter)
+ throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ TableRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new TableRecordReader();
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ /**
+ * Calculates the splits that will serve as input for the map tasks.
+ * <ul>
+ * Splits are created in number equal to the smallest between numSplits and
+ * the number of {@link HRegion}s in the table. If the number of splits is
+ * smaller than the number of {@link HRegion}s then splits are spanned across
+ * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+ * case splits are uneven the bigger splits are placed first in the
+ * {@link InputSplit} array.
+ *
+ * @param job the map task {@link JobConf}
+ * @param numSplits a hint to calculate the number of splits
+ *
+ * @return the input splits
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ byte [][] startKeys = this.table.getStartKeys();
+ if (startKeys == null || startKeys.length == 0) {
+ throw new IOException("Expecting at least one region");
+ }
+ if (this.table == null) {
+ throw new IOException("No table was provided");
+ }
+ if (this.inputColumns == null || this.inputColumns.length == 0) {
+ throw new IOException("Expecting at least one column");
+ }
+ int realNumSplits = numSplits > startKeys.length ? startKeys.length
+ : numSplits;
+ InputSplit[] splits = new InputSplit[realNumSplits];
+ int middle = startKeys.length / realNumSplits;
+ int startPos = 0;
+ for (int i = 0; i < realNumSplits; i++) {
+ int lastPos = startPos + middle;
+ lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+ splits[i] = new TableSplit(this.table.getTableName(),
+ startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
+ : HConstants.EMPTY_START_ROW);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("split: " + i + "->" + splits[i]);
+ }
+ startPos = lastPos;
+ }
+ return splits;
+
+ }
+
+ /**
+ * @param inputColumns to be passed in {@link VectorDatum} to the map task.
+ */
+ protected void setInputColums(byte [][] inputColumns) {
+ this.inputColumns = inputColumns;
+ }
+
+ /**
+ * Allows subclasses to set the {@link HTable}.
+ *
+ * @param table to get the data from
+ */
+ protected void setHTable(HTable table) {
+ this.table = table;
+ }
+
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader
+ * to provide other {@link TableRecordReader} implementations.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ /**
+ * Allows subclasses to set the {@link RowFilterInterface} to be used.
+ *
+ * @param rowFilter
+ */
+ protected void setRowFilter(RowFilterInterface rowFilter) {
+ this.rowFilter = rowFilter;
+ }
+}
\ No newline at end of file
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,54 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorDatum;
+
+@SuppressWarnings("unchecked")
+public abstract class MatrixMap<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements Mapper<ImmutableBytesWritable, VectorDatum, K, V> {
+ /**
+ * Use this before submitting a TableMap job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table table name
+ * @param columns columns to scan
+ * @param mapper mapper class
+ * @param job job configuration
+ */
+ public static void initJob(String table, String columns,
+ Class<? extends MatrixMap> mapper,
+ Class<? extends WritableComparable> outputKeyClass,
+ Class<? extends Writable> outputValueClass, JobConf job) {
+
+ job.setInputFormat(MatrixInputFormat.class);
+ job.setMapOutputValueClass(outputValueClass);
+ job.setMapOutputKeyClass(outputKeyClass);
+ job.setMapperClass(mapper);
+ FileInputFormat.addInputPaths(job, table);
+ job.set(TableInputFormat.COLUMN_LIST, columns);
+ }
+
+ /**
+ * Call a user defined function on a single HBase record, represented
+ * by a key and its associated record value.
+ *
+ * @param key
+ * @param value
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ public abstract void map(ImmutableBytesWritable key, VectorDatum value,
+ OutputCollector<K, V> output, Reporter reporter) throws IOException;
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,48 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+@SuppressWarnings("unchecked")
+public abstract class MatrixReduce<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
+ /**
+ * Use this before submitting a TableReduce job. It will
+ * appropriately set up the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table,
+ Class<? extends MatrixReduce> reducer, JobConf job) {
+ job.setOutputFormat(TableOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(TableOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ /**
+ *
+ * @param key
+ * @param values
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ public abstract void reduce(K key, Iterator<V> values,
+ OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter)
+ throws IOException;
+}
\ No newline at end of file
Added: incubator/hama/trunk/src/test/org/apache/hama/TestVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestVector.java?rev=680652&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestVector.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestVector.java Tue Jul 29 02:33:18 2008
@@ -0,0 +1,57 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import org.apache.hadoop.io.Text;
+
+public class TestVector extends HamaTestCase {
+
+ /**
+ * Test cosine similarity
+ */
+ public void testCosine() {
+ final double result = 0.6978227007909176;
+ Matrix m1 = new Matrix(conf, new Text("cosine"));
+
+ // TODO : We need setArray(int row, double[] value) to matrix
+ // e.g. matrixA.setArray(0, new double[] {2,5,1,4});
+ // -- Edward
+
+ m1.set(0, 0, 2);
+ m1.set(0, 1, 5);
+ m1.set(0, 2, 1);
+ m1.set(0, 3, 4);
+
+ m1.set(1, 0, 4);
+ m1.set(1, 1, 1);
+ m1.set(1, 2, 3);
+ m1.set(1, 3, 3);
+
+ LOG.info("get test : " + m1.get(0, 0));
+ LOG.info("get test : " + m1.get(0, 1));
+
+ Vector v1 = new Vector(m1.getRowResult(0));
+ Vector v2 = new Vector(m1.getRowResult(1));
+
+ double cos = v1.getCosine(v2);
+ assertEquals(cos, result);
+ m1.close();
+ }
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=680652&r1=680651&r2=680652&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Tue Jul 29 02:33:18 2008
@@ -27,17 +27,15 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.TableMap;
-import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.FeatureVector;
+import org.apache.hama.Vector;
import org.apache.hama.HamaTestCase;
import org.apache.hama.Matrix;
+import org.apache.hama.io.VectorDatum;
import org.apache.log4j.Logger;
/**
@@ -45,8 +43,6 @@
*/
public class TestMatrixMapReduce extends HamaTestCase {
static final Logger LOG = Logger.getLogger(TestMatrixMapReduce.class);
- protected Matrix a;
- protected Matrix b;
/** constructor */
public TestMatrixMapReduce() {
@@ -54,39 +50,35 @@
}
public static class AdditionMap extends
- TableMap<ImmutableBytesWritable, RowResult> {
- protected Matrix matrix_b;
+ MatrixMap<ImmutableBytesWritable, VectorDatum> {
+ protected Matrix B;
public static final String MATRIX_B = "hama.addition.substraction.matrix.b";
public void configure(JobConf job) {
- matrix_b = new Matrix(new HBaseConfiguration(), new Text("MatrixB"));
+ B = new Matrix(new HBaseConfiguration(), new Text("MatrixB"));
}
@Override
- public void map(ImmutableBytesWritable key, RowResult value,
- OutputCollector<ImmutableBytesWritable, RowResult> output,
+ public void map(ImmutableBytesWritable key, VectorDatum value,
+ OutputCollector<ImmutableBytesWritable, VectorDatum> output,
Reporter reporter) throws IOException {
- FeatureVector v1 = new FeatureVector(matrix_b.getRowResult(key.get()));
- FeatureVector v2 = new FeatureVector(value);
- FeatureVector v3 = v1.addition(v2);
- output.collect(key, v3.getRowResult(key.get()));
-
- LOG.info("xxx" + v3.getValueAt(0));
- LOG.info("xxx" + v3.getValueAt(1));
+ Vector v1 = new Vector(B.getRowResult(key.get()));
+ Vector v2 = value.getVector();
+ output.collect(key, v1.addition(key.get(), v2));
}
}
public static class AdditionReduce extends
- TableReduce<ImmutableBytesWritable, RowResult> {
+ MatrixReduce<ImmutableBytesWritable, VectorDatum> {
@Override
- public void reduce(ImmutableBytesWritable key, Iterator<RowResult> values,
+ public void reduce(ImmutableBytesWritable key, Iterator<VectorDatum> values,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
Reporter reporter) throws IOException {
BatchUpdate b = new BatchUpdate(key.get());
- RowResult r = values.next();
+ VectorDatum r = values.next();
for (Map.Entry<byte[], Cell> f : r.entrySet()) {
b.put(f.getKey(), f.getValue().getValue());
}
@@ -96,16 +88,14 @@
}
public void testMatrixMapReduce() throws IOException {
- a = new Matrix(conf, new Text("MatrixA"));
+ Matrix a = new Matrix(conf, new Text("MatrixA"));
a.set(0, 0, 1);
a.set(0, 1, 0);
- b = new Matrix(conf, new Text("MatrixB"));
+ Matrix b = new Matrix(conf, new Text("MatrixB"));
b.set(0, 0, 1);
b.set(0, 1, 1);
-
a.close();
b.close();
-
miniMRJob();
}
@@ -116,9 +106,9 @@
JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
jobConf.setJobName("test MR job");
- TableMap.initJob("MatrixA", "column:", AdditionMap.class,
- ImmutableBytesWritable.class, RowResult.class, jobConf);
- TableReduce.initJob("xanadu", AdditionReduce.class, jobConf);
+ MatrixMap.initJob("MatrixA", "column:", AdditionMap.class,
+ ImmutableBytesWritable.class, VectorDatum.class, jobConf);
+ MatrixReduce.initJob("xanadu", AdditionReduce.class, jobConf);
jobConf.setNumMapTasks(1);
jobConf.setNumReduceTasks(1);