You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/08/26 07:42:12 UTC
svn commit: r688966 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/mapred/
src/test/org/apache/hama/mapred/
Author: edwardyoon
Date: Mon Aug 25 22:42:11 2008
New Revision: 688966
URL: http://svn.apache.org/viewvc?rev=688966&view=rev
Log:
Replace ImmutableBytesWritable to IntWritable.
Added:
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Aug 25 22:42:11 2008
@@ -19,6 +19,7 @@
IMPROVEMENTS
+ HAMA-42: Replace ImmutableBytesWritable to IntWritable (edwardyoon)
HAMA-44: Remove findbugs warnings (edwardyoon)
HAMA-40: Rename MatrixInterface to Matrix (edwardyoon)
HAMA-14: Using Java 6 (edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java Mon Aug 25 22:42:11 2008
@@ -21,22 +21,21 @@
import java.io.IOException;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hama.DenseVector;
import org.apache.hama.Vector;
import org.apache.hama.mapred.DenseMap;
-import org.apache.hama.util.Numeric;
-public class AdditionMap extends DenseMap<ImmutableBytesWritable, DenseVector> {
+public class AdditionMap extends DenseMap<IntWritable, DenseVector> {
@Override
- public void map(ImmutableBytesWritable key, DenseVector value,
- OutputCollector<ImmutableBytesWritable, DenseVector> output,
+ public void map(IntWritable key, DenseVector value,
+ OutputCollector<IntWritable, DenseVector> output,
Reporter reporter) throws IOException {
- Vector v1 = MATRIX_B.getRow(Numeric.bytesToInt(key.get()));
+ Vector v1 = MATRIX_B.getRow(key.get());
output.collect(key, (DenseVector) v1.add(value));
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java Mon Aug 25 22:42:11 2008
@@ -25,21 +25,21 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hama.DenseVector;
import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.util.Numeric;
-public class AdditionReduce extends
- MatrixReduce<ImmutableBytesWritable, DenseVector> {
+public class AdditionReduce extends MatrixReduce<IntWritable, DenseVector> {
@Override
- public void reduce(ImmutableBytesWritable key, Iterator<DenseVector> values,
- OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
- Reporter reporter) throws IOException {
+ public void reduce(IntWritable key, Iterator<DenseVector> values,
+ OutputCollector<IntWritable, BatchUpdate> output, Reporter reporter)
+ throws IOException {
- BatchUpdate b = new BatchUpdate(key.get());
+ BatchUpdate b = new BatchUpdate(Numeric.intToBytes(key.get()));
DenseVector vector = values.next();
for (Map.Entry<byte[], Cell> f : vector.entrySet()) {
b.put(f.getKey(), f.getValue().getValue());
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java Mon Aug 25 22:42:11 2008
@@ -21,7 +21,7 @@
import java.io.IOException;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -39,7 +39,7 @@
@SuppressWarnings("unchecked")
public abstract class DenseMap<K extends WritableComparable, V extends Writable>
extends MapReduceBase implements
- Mapper<ImmutableBytesWritable, DenseVector, K, V> {
+ Mapper<IntWritable, DenseVector, K, V> {
protected static Matrix MATRIX_B;
public static void initJob(String matrixA, String matrixB,
@@ -57,6 +57,6 @@
job.set(MatrixInputFormat.COLUMN_LIST, Constants.COLUMN);
}
- public abstract void map(ImmutableBytesWritable key, DenseVector value,
+ public abstract void map(IntWritable key, DenseVector value,
OutputCollector<K, V> output, Reporter reporter) throws IOException;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java Mon Aug 25 22:42:11 2008
@@ -31,20 +31,21 @@
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.filter.RowFilterSet;
import org.apache.hadoop.hbase.filter.StopRowFilter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hama.DenseVector;
+import org.apache.hama.util.Numeric;
public abstract class MatrixInputFormatBase implements
- InputFormat<ImmutableBytesWritable, DenseVector> {
+ InputFormat<IntWritable, DenseVector> {
private final Log LOG = LogFactory.getLog(MatrixInputFormatBase.class);
private byte[][] inputColumns;
private HTable table;
@@ -55,7 +56,7 @@
* Iterate over an HBase table data, return (Text, DenseVector) pairs
*/
protected static class TableRecordReader implements
- RecordReader<ImmutableBytesWritable, DenseVector> {
+ RecordReader<IntWritable, DenseVector> {
private byte[] startRow;
private byte[] endRow;
private RowFilterInterface trrRowFilter;
@@ -136,8 +137,8 @@
*
* @see org.apache.hadoop.mapred.RecordReader#createKey()
*/
- public ImmutableBytesWritable createKey() {
- return new ImmutableBytesWritable();
+ public IntWritable createKey() {
+ return new IntWritable();
}
/**
@@ -172,12 +173,12 @@
* @throws IOException
*/
@SuppressWarnings("unchecked")
- public boolean next(ImmutableBytesWritable key, DenseVector value)
+ public boolean next(IntWritable key, DenseVector value)
throws IOException {
RowResult result = this.scanner.next();
boolean hasMore = result != null && result.size() > 0;
if (hasMore) {
- key.set(result.getRow());
+ key.set(Numeric.bytesToInt(result.getRow()));
Writables.copyWritable(result, value);
}
return hasMore;
@@ -191,7 +192,7 @@
* @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
* JobConf, Reporter)
*/
- public RecordReader<ImmutableBytesWritable, DenseVector> getRecordReader(
+ public RecordReader<IntWritable, DenseVector> getRecordReader(
InputSplit split, @SuppressWarnings("unused") JobConf job,
@SuppressWarnings("unused") Reporter reporter) throws IOException {
TableSplit tSplit = (TableSplit) split;
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java?rev=688966&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java Mon Aug 25 22:42:11 2008
@@ -0,0 +1,90 @@
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+public class MatrixOutputFormat extends
+ FileOutputFormat<IntWritable, BatchUpdate> {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+ private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
+ /**
+ * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+ * and write to an HBase table
+ */
+ protected static class TableRecordWriter implements
+ RecordWriter<IntWritable, BatchUpdate> {
+ private HTable m_table;
+
+ /**
+ * Instantiate a TableRecordWriter with the HBase HClient for writing.
+ *
+ * @param table
+ */
+ public TableRecordWriter(HTable table) {
+ m_table = table;
+ }
+
+ /** {@inheritDoc} */
+ public void close(@SuppressWarnings("unused")
+ Reporter reporter) {
+ // Nothing to do.
+ }
+
+ /** {@inheritDoc} */
+ public void write(@SuppressWarnings("unused")
+ IntWritable key, BatchUpdate value) throws IOException {
+ m_table.commit(value);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter getRecordWriter(@SuppressWarnings("unused")
+ FileSystem ignored, JobConf job, @SuppressWarnings("unused")
+ String name, @SuppressWarnings("unused")
+ Progressable progress) throws IOException {
+
+ // expecting exactly one path
+
+ String tableName = job.get(OUTPUT_TABLE);
+ HTable table = null;
+ try {
+ table = new HTable(new HBaseConfiguration(job), tableName);
+ } catch (IOException e) {
+ LOG.error(e);
+ throw e;
+ }
+ return new TableRecordWriter(table);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ @SuppressWarnings("unused")
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+ String tableName = job.get(OUTPUT_TABLE);
+ if (tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java Mon Aug 25 22:42:11 2008
@@ -23,8 +23,7 @@
import java.util.Iterator;
import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
@@ -35,7 +34,7 @@
@SuppressWarnings("unchecked")
public abstract class MatrixReduce<K extends WritableComparable, V extends Writable>
- extends MapReduceBase implements Reducer<K, V, ImmutableBytesWritable, BatchUpdate> {
+ extends MapReduceBase implements Reducer<K, V, IntWritable, BatchUpdate> {
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
@@ -46,10 +45,10 @@
*/
public static void initJob(String table,
Class<? extends MatrixReduce> reducer, JobConf job) {
- job.setOutputFormat(TableOutputFormat.class);
+ job.setOutputFormat(MatrixOutputFormat.class);
job.setReducerClass(reducer);
- job.set(TableOutputFormat.OUTPUT_TABLE, table);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.set(MatrixOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(BatchUpdate.class);
}
@@ -62,6 +61,6 @@
* @throws IOException
*/
public abstract void reduce(K key, Iterator<V> values,
- OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter)
+ OutputCollector<IntWritable, BatchUpdate> output, Reporter reporter)
throws IOException;
}
\ No newline at end of file
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=688966&r1=688965&r2=688966&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Mon Aug 25 22:42:11 2008
@@ -21,7 +21,7 @@
import java.io.IOException;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hama.DenseMatrix;
@@ -64,7 +64,7 @@
JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
jobConf.setJobName("test MR job");
- DenseMap.initJob(A, B, AdditionMap.class, ImmutableBytesWritable.class,
+ DenseMap.initJob(A, B, AdditionMap.class, IntWritable.class,
DenseVector.class, jobConf);
MatrixReduce.initJob(output, AdditionReduce.class, jobConf);
@@ -76,5 +76,4 @@
assertEquals(c.get(0, 0), 2.0);
assertEquals(c.get(0, 1), 1.0);
}
-
}