You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2008/11/06 07:36:15 UTC
svn commit: r711778 - in /incubator/hama/trunk/src: java/org/apache/hama/
java/org/apache/hama/algebra/ java/org/apache/hama/mapred/
test/org/apache/hama/mapred/
Author: edwardyoon
Date: Wed Nov 5 22:36:14 2008
New Revision: 711778
URL: http://svn.apache.org/viewvc?rev=711778&view=rev
Log:
Renaming.
Added:
incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/mapred/DenseMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixOutputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixReduce.java
Modified:
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Wed Nov 5 22:36:14 2008
@@ -38,7 +38,7 @@
import org.apache.hama.io.VectorMapWritable;
import org.apache.hama.io.VectorUpdate;
import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.mapred.RowCyclicReduce;
import org.apache.hama.util.JobManager;
import org.apache.hama.util.Numeric;
import org.apache.hama.util.RandomVariable;
@@ -255,7 +255,7 @@
Add1DLayoutMap.initJob(this.getPath(), B.getPath(), Add1DLayoutMap.class,
IntWritable.class, VectorWritable.class, jobConf);
- MatrixReduce.initJob(result.getPath(), Add1DLayoutReduce.class, jobConf);
+ RowCyclicReduce.initJob(result.getPath(), Add1DLayoutReduce.class, jobConf);
JobManager.execute(jobConf, result);
return result;
@@ -305,7 +305,7 @@
Mult1DLayoutMap.initJob(this.getPath(), B.getPath(), Mult1DLayoutMap.class,
IntWritable.class, VectorWritable.class, jobConf);
- MatrixReduce.initJob(result.getPath(), Mult1DLayoutReduce.class, jobConf);
+ RowCyclicReduce.initJob(result.getPath(), Mult1DLayoutReduce.class, jobConf);
JobManager.execute(jobConf, result);
return result;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutMap.java Wed Nov 5 22:36:14 2008
@@ -31,10 +31,10 @@
import org.apache.hama.Matrix;
import org.apache.hama.Vector;
import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.DenseMap;
+import org.apache.hama.mapred.RowCyclicMap;
import org.apache.log4j.Logger;
-public class Add1DLayoutMap extends DenseMap<IntWritable, VectorWritable> {
+public class Add1DLayoutMap extends RowCyclicMap<IntWritable, VectorWritable> {
static final Logger LOG = Logger.getLogger(Add1DLayoutMap.class);
protected Matrix matrix_b;
public static final String MATRIX_B = "hama.addition.matrix.b";
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Add1DLayoutReduce.java Wed Nov 5 22:36:14 2008
@@ -29,9 +29,9 @@
import org.apache.hama.io.VectorEntry;
import org.apache.hama.io.VectorUpdate;
import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.mapred.RowCyclicReduce;
-public class Add1DLayoutReduce extends MatrixReduce<IntWritable, VectorWritable> {
+public class Add1DLayoutReduce extends RowCyclicReduce<IntWritable, VectorWritable> {
@Override
public void reduce(IntWritable key, Iterator<VectorWritable> values,
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutMap.java Wed Nov 5 22:36:14 2008
@@ -31,13 +31,13 @@
import org.apache.hama.Matrix;
import org.apache.hama.Vector;
import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.DenseMap;
+import org.apache.hama.mapred.RowCyclicMap;
import org.apache.log4j.Logger;
/**
* 1D Block Layout version
*/
-public class Mult1DLayoutMap extends DenseMap<IntWritable, VectorWritable> {
+public class Mult1DLayoutMap extends RowCyclicMap<IntWritable, VectorWritable> {
static final Logger LOG = Logger.getLogger(Mult1DLayoutMap.class);
protected Matrix matrix_b;
public static final String MATRIX_B = "hama.multiplication.matrix.b";
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/Mult1DLayoutReduce.java Wed Nov 5 22:36:14 2008
@@ -29,11 +29,11 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hama.io.VectorUpdate;
import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.MatrixReduce;
+import org.apache.hama.mapred.RowCyclicReduce;
import org.apache.log4j.Logger;
public class Mult1DLayoutReduce extends
- MatrixReduce<IntWritable, VectorWritable> {
+ RowCyclicReduce<IntWritable, VectorWritable> {
static final Logger LOG = Logger.getLogger(Mult1DLayoutReduce.class);
public static final Map<Integer, Double> buffer = new HashMap<Integer, Double>();
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java Wed Nov 5 22:36:14 2008
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.Matrix;
+import org.apache.hama.io.VectorWritable;
+
+@SuppressWarnings("unchecked")
+public abstract class RowCyclicMap<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements
+ Mapper<IntWritable, VectorWritable, K, V> {
+ public static Matrix MATRIX_B;
+
+ public static void initJob(String matrixA,
+ Class<? extends RowCyclicMap> mapper,
+ JobConf job) {
+
+ job.setInputFormat(VectorInputFormat.class);
+ job.setMapperClass(mapper);
+ FileInputFormat.addInputPaths(job, matrixA);
+
+ job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+ }
+
+ public abstract void map(IntWritable key, VectorWritable value,
+ OutputCollector<K, V> output, Reporter reporter) throws IOException;
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java Wed Nov 5 22:36:14 2008
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+
+@SuppressWarnings("unchecked")
+public abstract class RowCyclicReduce<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements Reducer<K, V, IntWritable, VectorUpdate> {
+ /**
+ * Use this before submitting a TableReduce job. It will appropriately set up
+ * the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table,
+ Class<? extends RowCyclicReduce> reducer, JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ /**
+ *
+ * @param key
+ * @param values
+ * @param output
+ * @param reporter
+ * @throws IOException
+ */
+ public abstract void reduce(K key, Iterator<V> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException;
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Wed Nov 5 22:36:14 2008
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+
+public class VectorInputFormat extends VectorInputFormatBase implements
+ JobConfigurable {
+ private final Log LOG = LogFactory.getLog(VectorInputFormat.class);
+
+ /**
+ * space delimited list of columns
+ */
+ public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+
+ /** {@inheritDoc} */
+ public void configure(JobConf job) {
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte[][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColums(m_cols);
+ try {
+ setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void validateInput(JobConf job) throws IOException {
+ // expecting exactly one path
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ if (tableNames == null || tableNames.length > 1) {
+ throw new IOException("expecting one table name");
+ }
+
+ // expecting at least one column
+ String colArg = job.get(COLUMN_LIST);
+ if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java Wed Nov 5 22:36:14 2008
@@ -0,0 +1,265 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.util.Numeric;
+
+public abstract class VectorInputFormatBase implements
+ InputFormat<IntWritable, VectorWritable> {
+ private byte[][] inputColumns;
+ private HTable table;
+ private TableRecordReader tableRecordReader;
+ private RowFilterInterface rowFilter;
+
+ /**
+ * Iterate over an HBase table data, return (Text, VectorWritable) pairs
+ */
+ protected static class TableRecordReader implements
+ RecordReader<IntWritable, VectorWritable> {
+ private byte[] startRow;
+ private byte[] endRow;
+ private RowFilterInterface trrRowFilter;
+ private Scanner scanner;
+ private HTable htable;
+ private byte[][] trrInputColumns;
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ final Set<RowFilterInterface> rowFiltersSet = new HashSet<RowFilterInterface>();
+ rowFiltersSet.add(new StopRowFilter(endRow));
+ rowFiltersSet.add(trrRowFilter);
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+ rowFiltersSet));
+ } else {
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ endRow);
+ }
+ } else {
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ trrRowFilter);
+ }
+ }
+
+ /**
+ * @param htable the {@link HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns the columns to be placed in {@link VectorWritable}.
+ */
+ public void setInputColumns(final byte[][] inputColumns) {
+ byte[][] columns = inputColumns;
+ this.trrInputColumns = columns;
+ }
+
+ /**
+ * @param startRow the first row in the split
+ */
+ public void setStartRow(final byte[] startRow) {
+ byte[] sRow = startRow;
+ this.startRow = sRow;
+ }
+
+ /**
+ *
+ * @param endRow the last row in the split
+ */
+ public void setEndRow(final byte[] endRow) {
+ byte[] eRow = endRow;
+ this.endRow = eRow;
+ }
+
+ /**
+ * @param rowFilter the {@link RowFilterInterface} to be used.
+ */
+ public void setRowFilter(RowFilterInterface rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ this.scanner.close();
+ }
+
+ /**
+ * @return IntWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public IntWritable createKey() {
+ return new IntWritable();
+ }
+
+ /**
+ * @return VectorWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public VectorWritable createValue() {
+ return new VectorWritable();
+ }
+
+ /** {@inheritDoc} */
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ /**
+ * @param key IntWritable as input key.
+ * @param value VectorWritable as input value
+ *
+ * Converts Scanner.next() to IntWritable, VectorWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(IntWritable key, VectorWritable value) throws IOException {
+ RowResult result = this.scanner.next();
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ key.set(Numeric.bytesToInt(result.getRow()));
+ Writables.copyWritable(result, value);
+ }
+ return hasMore;
+ }
+ }
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+ * default.
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+ * JobConf, Reporter)
+ */
+ public RecordReader<IntWritable, VectorWritable> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ TableRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new TableRecordReader();
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ Cell meta = this.table.get(Constants.METADATA, Constants.METADATA_ROWS);
+
+ if (Numeric.bytesToInt(meta.getValue()) < numSplits) {
+ numSplits = Numeric.bytesToInt(meta.getValue());
+ }
+
+ int[] startKeys = new int[numSplits];
+ int interval = Numeric.bytesToInt(meta.getValue()) / numSplits;
+
+ for (int i = 0; i < numSplits; i++) {
+ startKeys[i] = (i * interval);
+ }
+
+ InputSplit[] splits = new InputSplit[startKeys.length];
+ for (int i = 0; i < startKeys.length; i++) {
+ splits[i] = new TableSplit(this.table.getTableName(),
+ Numeric.intToBytes(startKeys[i]), ((i + 1) < startKeys.length) ?
+ Numeric.intToBytes(startKeys[i + 1]) : HConstants.EMPTY_START_ROW);
+ }
+ return splits;
+ }
+
+ /**
+ * @param inputColumns to be passed in {@link VectorWritable} to the map task.
+ */
+ protected void setInputColums(byte[][] inputColumns) {
+ this.inputColumns = inputColumns;
+ }
+
+ /**
+ * Allows subclasses to set the {@link HTable}.
+ *
+ * @param table to get the data from
+ */
+ protected void setHTable(HTable table) {
+ this.table = table;
+ }
+
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader to provide other {@link TableRecordReader}
+ * implementations.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+
+ /**
+ * Allows subclasses to set the {@link RowFilterInterface} to be used.
+ *
+ * @param rowFilter
+ */
+ protected void setRowFilter(RowFilterInterface rowFilter) {
+ this.rowFilter = rowFilter;
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java?rev=711778&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorOutputFormat.java Wed Nov 5 22:36:14 2008
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hama.io.VectorUpdate;
+
+public class VectorOutputFormat extends
+ FileOutputFormat<IntWritable, VectorUpdate> {
+
+ /** JobConf parameter that specifies the output table */
+ public static final String OUTPUT_TABLE = "hama.mapred.output";
+ private final static Log LOG = LogFactory.getLog(VectorOutputFormat.class);
+
+ /**
+ * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
+ * and write to an HBase table
+ */
+ protected static class TableRecordWriter implements
+ RecordWriter<IntWritable, VectorUpdate> {
+ private HTable m_table;
+
+ /**
+ * Instantiate a TableRecordWriter with the HBase HClient for writing.
+ *
+ * @param table
+ */
+ public TableRecordWriter(HTable table) {
+ m_table = table;
+ }
+
+ /** {@inheritDoc} */
+ public void close(Reporter reporter) {
+ }
+
+ /** {@inheritDoc} */
+ public void write(IntWritable key, VectorUpdate value) throws IOException {
+ m_table.commit(value.getBatchUpdate());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress) throws IOException {
+
+ // expecting exactly one path
+
+ String tableName = job.get(OUTPUT_TABLE);
+ HTable table = null;
+ try {
+ table = new HTable(new HBaseConfiguration(job), tableName);
+ } catch (IOException e) {
+ LOG.error(e);
+ throw e;
+ }
+ return new TableRecordWriter(table);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+
+ String tableName = job.get(OUTPUT_TABLE);
+ if (tableName == null) {
+ throw new IOException("Must specify table name");
+ }
+ }
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=711778&r1=711777&r2=711778&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Wed Nov 5 22:36:14 2008
@@ -71,7 +71,7 @@
Add1DLayoutMap.initJob(pathA, pathB, Add1DLayoutMap.class, IntWritable.class,
VectorWritable.class, jobConf);
- MatrixReduce.initJob(output, Add1DLayoutReduce.class, jobConf);
+ RowCyclicReduce.initJob(output, Add1DLayoutReduce.class, jobConf);
jobConf.setNumMapTasks(2);
jobConf.setNumReduceTasks(2);