You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/02/13 07:43:35 UTC
svn commit: r744008 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/
src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/
src/test/org/apache/hama/
Author: edwardyoon
Date: Fri Feb 13 06:43:34 2009
New Revision: 744008
URL: http://svn.apache.org/viewvc?rev=744008&view=rev
Log:
Implement add(double alpha, Matrix B)
Added:
incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/io/MapWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Feb 13 06:43:34 2009
@@ -3,7 +3,7 @@
Trunk (unreleased changes)
NEW FEATURES
-
+
HAMA-151: Add multiplication example of file matrices (edwardyoon)
HAMA-145: Add privacy policy page (edwardyoon)
HAMA-83: 2D sqaure blocking for dense matrix multiplication (edwardyoon)
@@ -34,7 +34,8 @@
HAMA-2: The intial donation of Hama from the google project (edwardyoon)
IMPROVEMENTS
-
+
+ HAMA-109: Implement add(double alpha, Matrix B) (edwardyoon)
HAMA-150: Refactor blockingMapRed (edwardyoon)
HAMA-148: Implement of set(double alpha, Matrix B) (edwardyoon)
HAMA-100: Implement of set(Matrix B) (edwardyoon)
@@ -98,7 +99,7 @@
HAMA-9: Upgrade dependencies (edwardyoon)
BUG FIXES
-
+
HAMA-147: Fix typos (edwardyoon)
HAMA-140: In subMatrix(), Scanner should be closed (edwardyoon)
HAMA-120: remove findbugs warning in shell package (samuel via edwardyoon)
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Fri Feb 13 06:43:34 2009
@@ -189,6 +189,8 @@
}
public static void setAlpha(double a) {
+ if(alpha.size() > 0)
+ alpha = new ArrayList<Double>();
alpha.add(a);
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java Fri Feb 13 06:43:34 2009
@@ -22,13 +22,13 @@
import java.util.Iterator;
import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.MapWritable;
+import org.apache.hama.io.HMapWritable;
/**
* Methods of the vector classes
*/
public abstract class AbstractVector {
- public MapWritable<Integer, DoubleEntry> entries;
+ public HMapWritable<Integer, DoubleEntry> entries;
/**
* Gets the value of index
@@ -56,7 +56,7 @@
public void set(int index, double value) {
// If entries are null, create new object
if(this.entries == null) {
- this.entries = new MapWritable<Integer, DoubleEntry>();
+ this.entries = new HMapWritable<Integer, DoubleEntry>();
}
this.entries.put(index, new DoubleEntry(value));
@@ -91,11 +91,11 @@
}
/**
- * Returns the {@link org.apache.hama.io.MapWritable}
+ * Returns the {@link org.apache.hama.io.HMapWritable}
*
* @return the entries of vector
*/
- public MapWritable<Integer, DoubleEntry> getEntries() {
+ public HMapWritable<Integer, DoubleEntry> getEntries() {
return this.entries;
}
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Fri Feb 13 06:43:34 2009
@@ -46,7 +46,7 @@
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.MapWritable;
+import org.apache.hama.io.HMapWritable;
import org.apache.hama.io.VectorUpdate;
import org.apache.hama.io.VectorWritable;
import org.apache.hama.mapred.CollectBlocksMapper;
@@ -96,6 +96,7 @@
// if force is set to true:
// 1) if this matrixName has aliase to other matrix, we will remove
// the old aliase, create a new matrix table, and aliase to it.
+
// 2) if this matrixName has no aliase to other matrix, we will create
// a new matrix table, and alise to it.
//
@@ -340,7 +341,6 @@
public Matrix add(Matrix B) throws IOException {
Matrix result = new DenseMatrix(config);
-
JobConf jobConf = new JobConf(config);
jobConf.setJobName("addition MR job" + result.getPath());
@@ -358,8 +358,11 @@
}
public Matrix add(double alpha, Matrix B) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ Matrix temp = new DenseMatrix(config);
+ temp.set(alpha, B);
+
+ Matrix result = this.add(temp);
+ return result;
}
public DenseVector getRow(int row) throws IOException {
@@ -371,7 +374,7 @@
byte[][] c = { columnKey };
Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
- MapWritable<Integer, DoubleEntry> trunk = new MapWritable<Integer, DoubleEntry>();
+ HMapWritable<Integer, DoubleEntry> trunk = new HMapWritable<Integer, DoubleEntry>();
for (RowResult row : scan) {
trunk.put(BytesUtil.bytesToInt(row.getRow()), new DoubleEntry(row
Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java Fri Feb 13 06:43:34 2009
@@ -26,7 +26,7 @@
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.MapWritable;
+import org.apache.hama.io.HMapWritable;
import org.apache.hama.util.BytesUtil;
import org.apache.log4j.Logger;
@@ -34,15 +34,15 @@
static final Logger LOG = Logger.getLogger(DenseVector.class);
public DenseVector() {
- this(new MapWritable<Integer, DoubleEntry>());
+ this(new HMapWritable<Integer, DoubleEntry>());
}
- public DenseVector(MapWritable<Integer, DoubleEntry> m) {
+ public DenseVector(HMapWritable<Integer, DoubleEntry> m) {
this.entries = m;
}
public DenseVector(RowResult row) {
- this.entries = new MapWritable<Integer, DoubleEntry>();
+ this.entries = new HMapWritable<Integer, DoubleEntry>();
for (Map.Entry<byte[], Cell> f : row.entrySet()) {
this.entries.put(BytesUtil.getColumnIndex(f.getKey()),
new DoubleEntry(f.getValue()));
Added: incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java?rev=744008&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/HMapWritable.java Fri Feb 13 06:43:34 2009
@@ -0,0 +1,189 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BytesUtil;
+
+public class HMapWritable<K, V> implements Map<Integer, V>, Writable,
+ Configurable {
+ private AtomicReference<Configuration> conf = new AtomicReference<Configuration>();
+
+ // Static maps of code to class and vice versa. Includes types used in hama
+ // only.
+ static final Map<Byte, Class<?>> CODE_TO_CLASS = new HashMap<Byte, Class<?>>();
+ static final Map<Class<?>, Byte> CLASS_TO_CODE = new HashMap<Class<?>, Byte>();
+
+ static {
+ byte code = 0;
+ addToMap(HStoreKey.class, code++);
+ addToMap(ImmutableBytesWritable.class, code++);
+ addToMap(Text.class, code++);
+ addToMap(DoubleEntry.class, code++);
+ addToMap(byte[].class, code++);
+ }
+
+ @SuppressWarnings("boxing")
+ private static void addToMap(final Class<?> clazz, final byte code) {
+ CLASS_TO_CODE.put(clazz, code);
+ CODE_TO_CLASS.put(code, clazz);
+ }
+
+ private Map<Integer, V> instance = new TreeMap<Integer, V>();
+
+ /** @return the conf */
+ public Configuration getConf() {
+ return conf.get();
+ }
+
+ /** @param conf the conf to set */
+ public void setConf(Configuration conf) {
+ this.conf.set(conf);
+ }
+
+ /** {@inheritDoc} */
+ public void clear() {
+ instance.clear();
+ }
+
+ /** {@inheritDoc} */
+ public boolean containsKey(Object key) {
+ return instance.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ public boolean containsValue(Object value) {
+ return instance.containsValue(value);
+ }
+
+ /** {@inheritDoc} */
+ public Set<Entry<Integer, V>> entrySet() {
+ return instance.entrySet();
+ }
+
+ /** {@inheritDoc} */
+ public V get(Object key) {
+ return instance.get(key);
+ }
+
+ /** {@inheritDoc} */
+ public boolean isEmpty() {
+ return instance.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ public Set<Integer> keySet() {
+ return instance.keySet();
+ }
+
+ /** {@inheritDoc} */
+ public int size() {
+ return instance.size();
+ }
+
+ /** {@inheritDoc} */
+ public Collection<V> values() {
+ return instance.values();
+ }
+
+ // Writable
+
+ /** @return the Class class for the specified id */
+ protected Class<?> getClass(byte id) {
+ return CODE_TO_CLASS.get(id);
+ }
+
+ /** @return the id for the specified Class */
+ protected byte getId(Class<?> clazz) {
+ Byte b = CLASS_TO_CODE.get(clazz);
+ if (b == null) {
+ throw new NullPointerException("Nothing for : " + clazz);
+ }
+ return b;
+ }
+
+ @Override
+ public String toString() {
+ return this.instance.toString();
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ // Write out the number of entries in the map
+ out.writeInt(this.instance.size());
+
+ // Then write out each key/value pair
+ for (Map.Entry<Integer, V> e : instance.entrySet()) {
+ Bytes.writeByteArray(out, BytesUtil.getColumnIndex(e.getKey()));
+ out.writeByte(getId(e.getValue().getClass()));
+ ((Writable) e.getValue()).write(out);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+public void readFields(DataInput in) throws IOException {
+ // First clear the map. Otherwise we will just accumulate
+ // entries every time this method is called.
+ this.instance.clear();
+
+ // Read the number of entries in the map
+ int entries = in.readInt();
+
+ // Then read each key/value pair
+ for (int i = 0; i < entries; i++) {
+ byte[] key = Bytes.readByteArray(in);
+ Writable value = (Writable) ReflectionUtils.newInstance(getClass(in
+ .readByte()), getConf());
+ value.readFields(in);
+ V v = (V) value;
+ this.instance.put(BytesUtil.getColumnIndex(key), v);
+ }
+ }
+
+ public void putAll(Map<? extends Integer, ? extends V> m) {
+ this.instance.putAll(m);
+ }
+
+ public V remove(Object key) {
+ return this.instance.remove(key);
+ }
+
+ public V put(Integer key, V value) {
+ return this.instance.put(key, value);
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorWritable.java Fri Feb 13 06:43:34 2009
@@ -38,13 +38,13 @@
public class VectorWritable implements Writable, Map<Integer, DoubleEntry> {
public Integer row;
- public MapWritable<Integer, DoubleEntry> entries;
+ public HMapWritable<Integer, DoubleEntry> entries;
public VectorWritable() {
- this(new MapWritable<Integer, DoubleEntry>());
+ this(new HMapWritable<Integer, DoubleEntry>());
}
- public VectorWritable(MapWritable<Integer, DoubleEntry> entries) {
+ public VectorWritable(HMapWritable<Integer, DoubleEntry> entries) {
this.entries = entries;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Fri Feb 13 06:43:34 2009
@@ -39,7 +39,7 @@
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
-public class BlockInputFormat extends TableInputFormatBase implements
+public class BlockInputFormat extends HTableInputFormatBase implements
InputFormat<BlockID, BlockWritable>, JobConfigurable {
static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
private TableRecordReader tableRecordReader;
@@ -47,7 +47,7 @@
/**
* Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
*/
- protected static class TableRecordReader extends TableRecordReaderBase
+ protected static class TableRecordReader extends HTableRecordReaderBase
implements RecordReader<BlockID, BlockWritable> {
/**
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java?rev=744008&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableInputFormatBase.java Fri Feb 13 06:43:34 2009
@@ -0,0 +1,154 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public abstract class HTableInputFormatBase {
+ private static final Log LOG = LogFactory.getLog(HTableInputFormatBase.class);
+ protected byte[][] inputColumns;
+ protected HTable table;
+ protected RowFilterInterface rowFilter;
+
+ /**
+ * space delimited list of columns
+ */
+ public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+
+ public void configure(JobConf job) {
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ String colArg = job.get(COLUMN_LIST);
+ String[] colNames = colArg.split(" ");
+ byte[][] m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+ setInputColums(m_cols);
+ try {
+ setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ // expecting exactly one path
+ Path[] tableNames = FileInputFormat.getInputPaths(job);
+ if (tableNames == null || tableNames.length > 1) {
+ throw new IOException("expecting one table name");
+ }
+
+ // expecting at least one column
+ String colArg = job.get(COLUMN_LIST);
+ if (colArg == null || colArg.length() == 0) {
+ throw new IOException("expecting at least one column");
+ }
+ }
+
+ /**
+ * Calculates the splits that will serve as input for the map tasks.
+ * <ul>
+ * Splits are created in number equal to the smallest between numSplits and
+ * the number of {@link HRegion}s in the table. If the number of splits is
+ * smaller than the number of {@link HRegion}s then splits are spanned across
+ * multiple {@link HRegion}s and are grouped the most evenly possible. In the
+ * case splits are uneven the bigger splits are placed first in the
+ * {@link InputSplit} array.
+ *
+ * @param job the map task {@link JobConf}
+ * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
+ *
+ * @return the input splits
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ byte [][] startKeys = null;
+ try {
+ startKeys = this.table.getStartKeys();
+ } catch (NullPointerException e) { }
+
+ if (startKeys == null || startKeys.length == 0) {
+ throw new IOException("Expecting at least one region");
+ }
+ if (this.table == null) {
+ throw new IOException("No table was provided");
+ }
+ if (this.inputColumns == null || this.inputColumns.length == 0) {
+ throw new IOException("Expecting at least one column");
+ }
+ int realNumSplits = numSplits > startKeys.length? startKeys.length:
+ numSplits;
+ InputSplit[] splits = new InputSplit[realNumSplits];
+ int middle = startKeys.length / realNumSplits;
+ int startPos = 0;
+ for (int i = 0; i < realNumSplits; i++) {
+ int lastPos = startPos + middle;
+ lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+ String regionLocation = table.getRegionLocation(startKeys[startPos]).
+ getServerAddress().getHostname();
+ splits[i] = new TableSplit(this.table.getTableName(),
+ startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
+ HConstants.EMPTY_START_ROW, regionLocation);
+ LOG.info("split: " + i + "->" + splits[i]);
+ startPos = lastPos;
+ }
+ return splits;
+ }
+
+ /**
+ * @param inputColumns to be passed to the map task.
+ */
+ protected void setInputColums(byte[][] inputColumns) {
+ this.inputColumns = inputColumns;
+ }
+
+ /**
+ * Allows subclasses to set the {@link HTable}.
+ *
+ * @param table to get the data from
+ */
+ protected void setHTable(HTable table) {
+ this.table = table;
+ }
+
+ /**
+ * Allows subclasses to set the {@link RowFilterInterface} to be used.
+ *
+ * @param rowFilter
+ */
+ protected void setRowFilter(RowFilterInterface rowFilter) {
+ this.rowFilter = rowFilter;
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java?rev=744008&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/HTableRecordReaderBase.java Fri Feb 13 06:43:34 2009
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+
+public abstract class HTableRecordReaderBase {
+ protected byte[] startRow;
+ protected byte[] endRow;
+ protected byte [] lastRow;
+ protected RowFilterInterface trrRowFilter;
+ protected Scanner scanner;
+ protected HTable htable;
+ protected byte[][] trrInputColumns;
+
+ /**
+ * Restart from survivable exceptions by creating a new scanner.
+ *
+ * @param firstRow
+ * @throws IOException
+ */
+ public void restart(byte[] firstRow) throws IOException {
+ if ((endRow != null) && (endRow.length > 0)) {
+ if (trrRowFilter != null) {
+ final Set<RowFilterInterface> rowFiltersSet =
+ new HashSet<RowFilterInterface>();
+ rowFiltersSet.add(new StopRowFilter(endRow));
+ rowFiltersSet.add(trrRowFilter);
+ this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+ new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
+ rowFiltersSet));
+ } else {
+ this.scanner =
+ this.htable.getScanner(trrInputColumns, firstRow, endRow);
+ }
+ } else {
+ this.scanner =
+ this.htable.getScanner(trrInputColumns, firstRow, trrRowFilter);
+ }
+ }
+
+ /**
+ * Build the scanner. Not done in constructor to allow for extension.
+ *
+ * @throws IOException
+ */
+ public void init() throws IOException {
+ restart(startRow);
+ }
+
+ /**
+ * @param htable the {@link HTable} to scan.
+ */
+ public void setHTable(HTable htable) {
+ this.htable = htable;
+ }
+
+ /**
+ * @param inputColumns the columns
+ */
+ public void setInputColumns(final byte[][] inputColumns) {
+ byte[][] columns = inputColumns;
+ this.trrInputColumns = columns;
+ }
+
+ /**
+ * @param startRow the first row in the split
+ */
+ public void setStartRow(final byte[] startRow) {
+ byte[] sRow = startRow;
+ this.startRow = sRow;
+ }
+
+ /**
+ *
+ * @param endRow the last row in the split
+ */
+ public void setEndRow(final byte[] endRow) {
+ byte[] eRow = endRow;
+ this.endRow = eRow;
+ }
+
+ /**
+ * @param rowFilter the {@link RowFilterInterface} to be used.
+ */
+ public void setRowFilter(RowFilterInterface rowFilter) {
+ this.trrRowFilter = rowFilter;
+ }
+
+ public void close() throws IOException {
+ this.scanner.close();
+ }
+
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ public float getProgress() {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Fri Feb 13 06:43:34 2009
@@ -39,7 +39,7 @@
import org.apache.hama.io.VectorWritable;
import org.apache.hama.util.BytesUtil;
-public class VectorInputFormat extends TableInputFormatBase implements
+public class VectorInputFormat extends HTableInputFormatBase implements
InputFormat<IntWritable, VectorWritable>, JobConfigurable {
static final Log LOG = LogFactory.getLog(VectorInputFormat.class);
private TableRecordReader tableRecordReader;
@@ -47,7 +47,7 @@
/**
* Iterate over an HBase table data, return (IntWritable, VectorWritable) pairs
*/
- protected static class TableRecordReader extends TableRecordReaderBase
+ protected static class TableRecordReader extends HTableRecordReaderBase
implements RecordReader<IntWritable, VectorWritable> {
private int totalRows;
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=744008&r1=744007&r2=744008&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Fri Feb 13 06:43:34 2009
@@ -168,6 +168,34 @@
verifyMultResult(m1, m2, result);
}
+ public void testSetMatrix() throws IOException {
+ Matrix a = new DenseMatrix(conf);
+ a.set(m1);
+
+ for (int i = 0; i < 5; i++) {
+ int x = RandomVariable.randInt(0, 10);
+ int y = RandomVariable.randInt(0, 10);
+ assertEquals(a.get(x, y), m1.get(x, y));
+ }
+ }
+
+ public void testSetAlphaMatrix() throws IOException {
+ Matrix a = new DenseMatrix(conf);
+ a.set(0.5, m1);
+
+ for (int i = 0; i < 5; i++) {
+ int x = RandomVariable.randInt(0, 10);
+ int y = RandomVariable.randInt(0, 10);
+ assertEquals(a.get(x, y), (m1.get(x, y) * 0.5));
+ }
+ }
+
+ public void testAddAlphaMatrix() throws IOException {
+ double value = m1.get(0, 0) + (m2.get(0, 0) * 0.1);
+ Matrix result = m1.add(0.1, m2);
+ assertEquals(value, result.get(0, 0));
+ }
+
public void testSetRow() throws IOException {
Vector v = new DenseVector();
double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
@@ -204,28 +232,6 @@
}
}
- public void testSetMatrix() throws IOException {
- Matrix a = new DenseMatrix(conf);
- a.set(m1);
-
- for (int i = 0; i < 5; i++) {
- int x = RandomVariable.randInt(0, 10);
- int y = RandomVariable.randInt(0, 10);
- assertEquals(a.get(x, y), m1.get(x, y));
- }
- }
-
- public void testSetAlphaMatrix() throws IOException {
- Matrix a = new DenseMatrix(conf);
- a.set(0.5, m1);
-
- for (int i = 0; i < 5; i++) {
- int x = RandomVariable.randInt(0, 10);
- int y = RandomVariable.randInt(0, 10);
- assertEquals(a.get(x, y), (m1.get(x, y) * 0.5));
- }
- }
-
public void testLoadSave() throws IOException {
String path1 = m1.getPath();
// save m1 to aliase1