You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/09/21 06:40:15 UTC
svn commit: r817156 [1/3] - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/graph/
src/java/org/apache/hama/io/ src/java/org/apache/hama/mapred/ sr...
Author: edwardyoon
Date: Mon Sep 21 04:40:10 2009
New Revision: 817156
URL: http://svn.apache.org/viewvc?rev=817156&view=rev
Log:
Refactor top-level package
Added:
incubator/hama/trunk/src/java/org/apache/hama/matrix/
incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/Matrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/SparseVector.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/SubMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/Vector.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/BlockMultiplyReduce.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/DenseMatrixVectorMultReduce.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/JacobiEigenValue.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormMapRed.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/MatrixNormReduce.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/RowCyclicAdditionMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/RowCyclicAdditionReduce.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/SparseMatrixVectorMultReduce.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/TransposeMap.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/TransposeReduce.java
incubator/hama/trunk/src/java/org/apache/hama/matrix/algebra/package.html
incubator/hama/trunk/src/java/org/apache/hama/matrix/package.html
- copied unchanged from r817136, incubator/hama/trunk/src/java/org/apache/hama/package.html
incubator/hama/trunk/src/test/org/apache/hama/matrix/
incubator/hama/trunk/src/test/org/apache/hama/matrix/HCluster.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/MatrixTestCommon.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestDenseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestDenseVector.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestMatrixVectorMult.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSingularValueDecomposition.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/matrix/TestSparseVector.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/AbstractVector.java
incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/DenseVector.java
incubator/hama/trunk/src/java/org/apache/hama/Matrix.java
incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/SparseVector.java
incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/Vector.java
incubator/hama/trunk/src/java/org/apache/hama/algebra/
incubator/hama/trunk/src/java/org/apache/hama/package.html
incubator/hama/trunk/src/java/org/apache/hama/sparse/
incubator/hama/trunk/src/test/org/apache/hama/HCluster.java
incubator/hama/trunk/src/test/org/apache/hama/MatrixTestCommon.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java
incubator/hama/trunk/src/test/org/apache/hama/TestMatrixVectorMult.java
incubator/hama/trunk/src/test/org/apache/hama/TestSingularValueDecomposition.java
incubator/hama/trunk/src/test/org/apache/hama/TestSparseMatrix.java
incubator/hama/trunk/src/test/org/apache/hama/TestSparseVector.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixAddition.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixNorm.java
incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java
incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java
incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java
incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java
incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java
incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
incubator/hama/trunk/src/java/org/apache/hama/shell/HamaShellEnv.java
incubator/hama/trunk/src/java/org/apache/hama/shell/execution/AlgebraOperator.java
incubator/hama/trunk/src/java/org/apache/hama/shell/execution/RandMatrixOperation.java
incubator/hama/trunk/src/java/org/apache/hama/shell/execution/SaveExpression.java
incubator/hama/trunk/src/java/org/apache/hama/util/JobManager.java
incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java
incubator/hama/trunk/src/test/org/apache/hama/graph/TestGraph.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java
incubator/hama/trunk/src/test/org/apache/hama/shell/parser/expression/TestHamaExpressionParser.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Sep 21 04:40:10 2009
@@ -40,6 +40,7 @@
IMPROVEMENTS
+ HAMA-192: Refactor top-level package (edwardyoon)
HAMA-189: Update website (edwardyoon)
HAMA-187: Add matrix subtraction test case (edwardyoon)
HAMA-186: Add density option for random matrix (edwardyoon)
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/FileMatrixBlockMult.java Mon Sep 21 04:40:10 2009
@@ -38,15 +38,15 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hama.Constants;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.DenseVector;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.algebra.BlockMultiplyMap;
-import org.apache.hama.algebra.BlockMultiplyReduce;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
import org.apache.hama.mapred.CollectBlocksMap;
import org.apache.hama.mapred.CollectBlocksMapReduceBase;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.algebra.BlockMultiplyMap;
+import org.apache.hama.matrix.algebra.BlockMultiplyReduce;
import org.apache.hama.util.JobManager;
import org.apache.hama.util.RandomVariable;
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixAddition.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixAddition.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixAddition.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixAddition.java Mon Sep 21 04:40:10 2009
@@ -21,8 +21,8 @@
import java.io.IOException;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.Matrix;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.Matrix;
public class MatrixAddition extends AbstractExample {
public static void main(String[] args) throws IOException {
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixMultiplication.java Mon Sep 21 04:40:10 2009
@@ -21,11 +21,11 @@
import java.io.IOException;
-import org.apache.hama.DenseMatrix;
import org.apache.hama.HamaAdmin;
import org.apache.hama.HamaAdminImpl;
-import org.apache.hama.Matrix;
-import org.apache.hama.SparseMatrix;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.Matrix;
+import org.apache.hama.matrix.SparseMatrix;
public class MatrixMultiplication extends AbstractExample {
public static void main(String[] args) throws IOException {
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixNorm.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixNorm.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixNorm.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/MatrixNorm.java Mon Sep 21 04:40:10 2009
@@ -23,8 +23,8 @@
import org.apache.hama.HamaAdmin;
import org.apache.hama.HamaAdminImpl;
-import org.apache.hama.Matrix;
-import org.apache.hama.Matrix.Norm;
+import org.apache.hama.matrix.Matrix;
+import org.apache.hama.matrix.Matrix.Norm;
public class MatrixNorm extends AbstractExample {
public static void main(String[] args) throws IOException {
Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/RandomMatrix.java Mon Sep 21 04:40:10 2009
@@ -21,9 +21,9 @@
import java.io.IOException;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.Matrix;
-import org.apache.hama.SparseMatrix;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.Matrix;
+import org.apache.hama.matrix.SparseMatrix;
public class RandomMatrix extends AbstractExample {
Modified: incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaAdmin.java Mon Sep 21 04:40:10 2009
@@ -21,6 +21,8 @@
import java.io.IOException;
+import org.apache.hama.matrix.Matrix;
+
/**
* A administration interface to manage the matrix's namespace, and table
* allocation & garbage collection.
Modified: incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/HamaAdminImpl.java Mon Sep 21 04:40:10 2009
@@ -30,6 +30,9 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.matrix.DenseMatrix;
+import org.apache.hama.matrix.Matrix;
+import org.apache.hama.matrix.SparseMatrix;
import org.apache.log4j.Logger;
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/SparseGraph.java Mon Sep 21 04:40:10 2009
@@ -33,8 +33,8 @@
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.SparseMatrix;
-import org.apache.hama.SparseVector;
+import org.apache.hama.matrix.SparseMatrix;
+import org.apache.hama.matrix.SparseVector;
/**
* A implementation of a graph that is optimized to store edge sparse graphs
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Mon Sep 21 04:40:10 2009
@@ -30,7 +30,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
-import org.apache.hama.SubMatrix;
+import org.apache.hama.matrix.SubMatrix;
public class BlockWritable implements Writable {
static final Log LOG = LogFactory.getLog(BlockWritable.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockOutputFormat.java Mon Sep 21 04:40:10 2009
@@ -34,9 +34,9 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hama.Constants;
-import org.apache.hama.SubMatrix;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
+import org.apache.hama.matrix.SubMatrix;
public class BlockOutputFormat extends
FileOutputFormat<BlockID, BlockWritable> {
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java Mon Sep 21 04:40:10 2009
@@ -22,8 +22,8 @@
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.DenseVector;
import org.apache.hama.io.BlockID;
+import org.apache.hama.matrix.DenseVector;
import org.apache.log4j.Logger;
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Mon Sep 21 04:40:10 2009
@@ -25,10 +25,10 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.DenseVector;
-import org.apache.hama.SubMatrix;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.SubMatrix;
/**
* Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k)
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java Mon Sep 21 04:40:10 2009
@@ -28,9 +28,9 @@
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.DenseVector;
-import org.apache.hama.SparseVector;
-import org.apache.hama.Vector;
+import org.apache.hama.matrix.DenseVector;
+import org.apache.hama.matrix.SparseVector;
+import org.apache.hama.matrix.Vector;
import org.apache.hama.util.RandomVariable;
import org.apache.log4j.Logger;
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=817156&r1=817155&r2=817156&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Mon Sep 21 04:40:10 2009
@@ -37,7 +37,7 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.DenseVector;
+import org.apache.hama.matrix.DenseVector;
import org.apache.hama.util.BytesUtil;
public class VectorInputFormat extends HTableInputFormatBase implements
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractMatrix.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,571 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.RegionException;
+import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaAdmin;
+import org.apache.hama.HamaAdminImpl;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.matrix.algebra.JacobiEigenValue;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed;
+import org.apache.hama.matrix.algebra.TransposeMap;
+import org.apache.hama.matrix.algebra.TransposeReduce;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixFrobeniusNormCombiner;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixFrobeniusNormMapper;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixFrobeniusNormReducer;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixInfinityNormMapper;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixInfinityNormReducer;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixMaxValueNormMapper;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixMaxValueNormReducer;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixOneNormCombiner;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixOneNormMapper;
+import org.apache.hama.matrix.algebra.MatrixNormMapRed.MatrixOneNormReducer;
+import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
+import org.apache.hama.util.RandomVariable;
+import org.apache.log4j.Logger;
+
+/**
+ * Methods of the matrix classes
+ */
+public abstract class AbstractMatrix implements Matrix {
+ static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
+ static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
+
+ protected HamaConfiguration config;
+ protected HBaseAdmin admin;
+ // a matrix just need a table path to point to the table which stores matrix.
+ // let HamaAdmin manage Matrix Name space.
+ protected String matrixPath;
+ protected HTable table;
+ protected HTableDescriptor tableDesc;
+ protected HamaAdmin hamaAdmin;
+
+ protected boolean closed = true;
+
+ /**
+ * Sets the job configuration
+ *
+ * @param conf configuration object
+ * @throws MasterNotRunningException
+ */
+ public void setConfiguration(HamaConfiguration conf)
+ throws MasterNotRunningException {
+ this.config = conf;
+ this.admin = new HBaseAdmin(config);
+
+ hamaAdmin = new HamaAdminImpl(conf, admin);
+ }
+
+ /**
+ * try to create a new matrix with a new random name. try times will be
+ * (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
+ *
+ * @throws IOException
+ */
+ protected void tryToCreateTable(String table_prefix) throws IOException {
+ int tryTimes = Constants.DEFAULT_TRY_TIMES;
+ do {
+ matrixPath = table_prefix + "_"
+ + RandomVariable.randMatrixPath(tryPathLength);
+
+ if (!admin.tableExists(matrixPath)) { // no table 'matrixPath' in hbase.
+ tableDesc = new HTableDescriptor(matrixPath);
+ create();
+ return;
+ }
+
+ tryTimes--;
+ if (tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
+ tryPathLength++;
+ tryTimes = Constants.DEFAULT_TRY_TIMES;
+ }
+
+ } while (tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
+ // exhaustes the try times.
+ // throw out an IOException to let the user know what happened.
+ throw new IOException("Try too many times to create a table in hbase.");
+ }
+
+ /**
+ * Create matrix space
+ */
+ protected void create() throws IOException {
+ // It should run only when table doesn't exist.
+ if (!admin.tableExists(matrixPath)) {
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(Constants.COLUMN), 3, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
+ this.tableDesc.addFamily(new HColumnDescriptor(Constants.ATTRIBUTE));
+ this.tableDesc.addFamily(new HColumnDescriptor(Constants.ALIASEFAMILY));
+ // It's a temporary data.
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(Constants.BLOCK), 1, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
+ // the following families are used in JacobiEigenValue computation
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(JacobiEigenValue.EI), 1, CompressionType.NONE, false, false,
+ Integer.MAX_VALUE, HConstants.FOREVER, false));
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(JacobiEigenValue.EICOL), 10, CompressionType.NONE, false,
+ false, Integer.MAX_VALUE, HConstants.FOREVER, false));
+ this.tableDesc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(JacobiEigenValue.EIVEC), 10, CompressionType.NONE, false,
+ false, Integer.MAX_VALUE, HConstants.FOREVER, false));
+
+ LOG.info("Initializing the matrix storage.");
+ this.admin.createTable(this.tableDesc);
+ LOG.info("Create Matrix " + matrixPath);
+
+ // connect to the table.
+ table = new HTable(config, matrixPath);
+ table.setAutoFlush(true);
+
+ // Record the matrix type in METADATA_TYPE
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.METADATA_TYPE, Bytes.toBytes(this.getClass()
+ .getSimpleName()));
+
+ table.commit(update);
+
+ // the new matrix's reference is 1.
+ setReference(1);
+ }
+ }
+
+ public HTable getHTable() {
+ return this.table;
+ }
+
+ protected double getNorm1() throws IOException {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("norm1 MR job : " + this.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(1);
+
+ final FileSystem fs = FileSystem.get(jobConf);
+ Path outDir = new Path(new Path(getType() + "_TMP_norm1_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+ MatrixOneNormMapper.class, MatrixOneNormCombiner.class,
+ MatrixOneNormReducer.class, jobConf);
+
+ // update the out put dir of the job
+ outDir = FileOutputFormat.getOutputPath(jobConf);
+ JobManager.execute(jobConf);
+
+ // read outputs
+ Path inFile = new Path(outDir, "reduce-out");
+ IntWritable numInside = new IntWritable();
+ DoubleWritable max = new DoubleWritable();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ try {
+ reader.next(numInside, max);
+ } finally {
+ reader.close();
+ }
+
+ fs.delete(outDir.getParent(), true);
+ return max.get();
+ }
+
+ protected double getMaxvalue() throws IOException {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("MaxValue Norm MR job : " + this.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(1);
+
+ final FileSystem fs = FileSystem.get(jobConf);
+ Path outDir = new Path(new Path(getType() + "_TMP_normMaxValue_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+ MatrixMaxValueNormMapper.class, MatrixMaxValueNormReducer.class,
+ MatrixMaxValueNormReducer.class, jobConf);
+
+ // update the out put dir of the job
+ outDir = FileOutputFormat.getOutputPath(jobConf);
+ JobManager.execute(jobConf);
+
+ // read outputs
+ Path inFile = new Path(outDir, "part-00000");
+ IntWritable numInside = new IntWritable();
+ DoubleWritable max = new DoubleWritable();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ try {
+ reader.next(numInside, max);
+ } finally {
+ reader.close();
+ }
+
+ fs.delete(outDir.getParent(), true);
+ return max.get();
+ }
+
+ protected double getInfinity() throws IOException {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("Infinity Norm MR job : " + this.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(1);
+
+ final FileSystem fs = FileSystem.get(jobConf);
+ Path outDir = new Path(new Path(getType() + "_TMP_normInifity_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+ MatrixInfinityNormMapper.class, MatrixInfinityNormReducer.class,
+ MatrixInfinityNormReducer.class, jobConf);
+
+ // update the out put dir of the job
+ outDir = FileOutputFormat.getOutputPath(jobConf);
+
+ JobManager.execute(jobConf);
+
+ // read outputs
+ Path inFile = new Path(outDir, "part-00000");
+ IntWritable numInside = new IntWritable();
+ DoubleWritable max = new DoubleWritable();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ try {
+ reader.next(numInside, max);
+ } finally {
+ reader.close();
+ }
+
+ fs.delete(outDir.getParent(), true);
+ return max.get();
+ }
+
+ protected double getFrobenius() throws IOException {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("Frobenius Norm MR job : " + this.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(1);
+
+ final FileSystem fs = FileSystem.get(jobConf);
+ Path outDir = new Path(new Path(getType() + "_TMP_normFrobenius_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ MatrixNormMapRed.initJob(this.getPath(), outDir.toString(),
+ MatrixFrobeniusNormMapper.class, MatrixFrobeniusNormCombiner.class,
+ MatrixFrobeniusNormReducer.class, jobConf);
+
+ // update the out put dir of the job
+ outDir = FileOutputFormat.getOutputPath(jobConf);
+
+ JobManager.execute(jobConf);
+
+ // read outputs
+ Path inFile = new Path(outDir, "part-00000");
+ IntWritable numInside = new IntWritable();
+ DoubleWritable sqrt = new DoubleWritable();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ try {
+ reader.next(numInside, sqrt);
+ } finally {
+ reader.close();
+ }
+
+ fs.delete(outDir.getParent(), true);
+ return sqrt.get();
+ }
+
+ /** {@inheritDoc} */
+ public int getRows() throws IOException {
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.METADATA_ROWS);
+ return (rows != null) ? BytesUtil.bytesToInt(rows.getValue()) : 0;
+ }
+
+ /** {@inheritDoc} */
+ public int getColumns() throws IOException {
+ Cell columns = table.get(Constants.METADATA, Constants.METADATA_COLUMNS);
+ return BytesUtil.bytesToInt(columns.getValue());
+ }
+
+ /** {@inheritDoc} */
+ public String getRowLabel(int row) throws IOException {
+ Cell rows = null;
+ rows = table.get(BytesUtil.getRowIndex(row), Bytes
+ .toBytes(Constants.ATTRIBUTE + "string"));
+
+ return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+ }
+
+ /** {@inheritDoc} */
+ public String getColumnLabel(int column) throws IOException {
+ Cell rows = null;
+ rows = table.get(Constants.CINDEX, (Constants.ATTRIBUTE + column));
+ return (rows != null) ? Bytes.toString(rows.getValue()) : null;
+ }
+
+ /** {@inheritDoc} */
+ public void setRowLabel(int row, String name) throws IOException {
+ VectorUpdate update = new VectorUpdate(row);
+ update.put(Constants.ATTRIBUTE + "string", name);
+ table.commit(update.getBatchUpdate());
+ }
+
+ /** {@inheritDoc} */
+ public void setDimension(int rows, int columns) throws IOException {
+ VectorUpdate update = new VectorUpdate(Constants.METADATA);
+ update.put(Constants.METADATA_ROWS, rows);
+ update.put(Constants.METADATA_COLUMNS, columns);
+
+ table.commit(update.getBatchUpdate());
+ }
+
+ /** {@inheritDoc} */
+ public void add(int i, int j, double value) throws IOException {
+ VectorUpdate update = new VectorUpdate(i);
+ update.put(j, value + this.get(i, j));
+ table.commit(update.getBatchUpdate());
+
+ }
+
+ /**
+ * Just full scan a table.
+ */
+ public static class TableReadMapper extends MapReduceBase implements
+ TableMap<ImmutableBytesWritable, BatchUpdate> {
+ private static List<Double> alpha = new ArrayList<Double>();
+
+ @SuppressWarnings("unchecked")
+ public void map(ImmutableBytesWritable key, RowResult value,
+ OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
+ @SuppressWarnings("unused")
+ Reporter reporter) throws IOException {
+
+ BatchUpdate update = new BatchUpdate(key.get());
+ for (Map.Entry<byte[], Cell> e : value.entrySet()) {
+ if (alpha.size() == 0) {
+ update.put(e.getKey(), e.getValue().getValue());
+ } else {
+ String column = new String(e.getKey());
+ if (column.startsWith(Constants.COLUMN)) {
+ double currValue = BytesUtil.bytesToDouble(e.getValue().getValue());
+ update.put(e.getKey(), (BytesUtil.doubleToBytes(currValue
+ * alpha.get(0))));
+ } else {
+ update.put(e.getKey(), e.getValue().getValue());
+ }
+ }
+ }
+ output.collect(key, update);
+ }
+
+ public static void setAlpha(double a) {
+ if (alpha.size() > 0)
+ alpha = new ArrayList<Double>();
+ alpha.add(a);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public Matrix set(Matrix B) throws IOException {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("set MR job : " + this.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ TableMapReduceUtil.initTableMapJob(B.getPath(), Constants.COLUMN + " "
+ + Constants.ATTRIBUTE + " " + Constants.ALIASEFAMILY + " "
+ + Constants.BLOCK, TableReadMapper.class, ImmutableBytesWritable.class,
+ BatchUpdate.class, jobConf);
+ TableMapReduceUtil.initTableReduceJob(this.getPath(),
+ IdentityTableReduce.class, jobConf);
+
+ JobManager.execute(jobConf);
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ public Matrix set(double alpha, Matrix B) throws IOException {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("set MR job : " + this.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ TableReadMapper.setAlpha(alpha);
+ TableMapReduceUtil.initTableMapJob(B.getPath(), Constants.COLUMN + " "
+ + Constants.ATTRIBUTE + " " + Constants.ALIASEFAMILY + " "
+ + Constants.BLOCK, TableReadMapper.class, ImmutableBytesWritable.class,
+ BatchUpdate.class, jobConf);
+ TableMapReduceUtil.initTableReduceJob(this.getPath(),
+ IdentityTableReduce.class, jobConf);
+
+ JobManager.execute(jobConf);
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ public void setColumnLabel(int column, String name) throws IOException {
+ VectorUpdate update = new VectorUpdate(Constants.CINDEX);
+ update.put(column, name);
+ table.commit(update.getBatchUpdate());
+ }
+
+ /** {@inheritDoc} */
+ public String getPath() {
+ return matrixPath;
+ }
+
+ protected void setReference(int reference) throws IOException {
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.METADATA_REFERENCE, Bytes.toBytes(reference));
+ table.commit(update);
+
+ }
+
+ protected int incrementAndGetRef() throws IOException {
+ int reference = 1;
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+ if (rows != null) {
+ reference = Bytes.toInt(rows.getValue());
+ reference++;
+ }
+ setReference(reference);
+ return reference;
+ }
+
+ protected int decrementAndGetRef() throws IOException {
+ int reference = 0;
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.METADATA_REFERENCE);
+ if (rows != null) {
+ reference = Bytes.toInt(rows.getValue());
+ if (reference > 0) // reference==0, we need not to decrement it.
+ reference--;
+ }
+ setReference(reference);
+ return reference;
+ }
+
+ protected boolean hasAliaseName() throws IOException {
+ Cell rows = null;
+ rows = table.get(Constants.METADATA, Constants.ALIASENAME);
+ return (rows != null) ? true : false;
+ }
+
+ public void close() throws IOException {
+ if (closed) // have been closed
+ return;
+ int reference = decrementAndGetRef();
+ if (reference <= 0) { // no reference again.
+ if (!hasAliaseName()) { // the table has not been aliased, we delete the
+ // table.
+ if (admin.isTableEnabled(matrixPath)) {
+ while (admin.isTableEnabled(matrixPath)) {
+ try {
+ admin.disableTable(matrixPath);
+ } catch (RegionException e) {
+ LOG.warn(e);
+ }
+ }
+
+ admin.deleteTable(matrixPath);
+ }
+ }
+ }
+ closed = true;
+ }
+
+ public Matrix transpose() throws IOException {
+ Matrix result;
+ if (this.getType().equals("SparseMatrix")) {
+ result = new SparseMatrix(config, this.getRows(), this.getColumns());
+ } else {
+ result = new DenseMatrix(config, this.getRows(), this.getColumns());
+ }
+
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("transpose MR job" + result.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ TransposeMap.initJob(this.getPath(), TransposeMap.class, IntWritable.class,
+ MapWritable.class, jobConf);
+ TransposeReduce.initJob(result.getPath(), TransposeReduce.class, jobConf);
+
+ JobManager.execute(jobConf);
+ return result;
+ }
+
+ public boolean save(String aliasename) throws IOException {
+ // mark & update the aliase name in "alise:name" meta column.
+ // ! one matrix has only one aliasename now.
+ BatchUpdate update = new BatchUpdate(Constants.METADATA);
+ update.put(Constants.ALIASENAME, Bytes.toBytes(aliasename));
+ update.put(Constants.ATTRIBUTE + "type", Bytes.toBytes(this.getType()));
+ table.commit(update);
+
+ return hamaAdmin.save(this, aliasename);
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/AbstractVector.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,98 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Methods of the vector classes
+ */
+public abstract class AbstractVector {
+ static final Logger LOG = Logger.getLogger(AbstractVector.class);
+ protected MapWritable entries;
+
+ public void initMap(RowResult row) {
+ this.entries = new MapWritable();
+ for (Map.Entry<byte[], Cell> f : row.entrySet()) {
+ this.entries.put(new IntWritable(BytesUtil.getColumnIndex(f.getKey())),
+ new DoubleEntry(f.getValue()));
+ }
+ }
+
+ /**
+ * Returns an Iterator.
+ *
+ * @return iterator
+ */
+ public Iterator<Writable> iterator() {
+ return this.entries.values().iterator();
+ }
+
+ /**
+ * Returns a size of vector. If vector is sparse, returns the number of only
+ * non-zero elements.
+ *
+ * @return a size of vector
+ */
+ public int size() {
+ int x = 0;
+ if (this.entries != null && this.entries.containsKey(new Text("row")))
+ x = 1;
+
+ return (this.entries != null) ? this.entries.size() - x : 0;
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.io.MapWritable}
+ *
+ * @return the entries of vector
+ */
+ public MapWritable getEntries() {
+ return this.entries;
+ }
+
+ /**
+ * Checks for conformant sizes
+ */
+ protected void checkComformantSize(Vector v2) {
+ if (this.size() != v2.size()) {
+ throw new IndexOutOfBoundsException("v1.size != v2.size (" + this.size()
+ + " != " + v2.size() + ")");
+ }
+ }
+
+ /**
+ * Clears the entries.
+ */
+ public void clear() {
+ this.entries = null;
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseMatrix.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,933 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.Pair;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.CollectBlocksMapper;
+import org.apache.hama.mapred.DummyMapper;
+import org.apache.hama.mapred.RandomMatrixMap;
+import org.apache.hama.mapred.RandomMatrixReduce;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.hama.matrix.Matrix.Norm;
+import org.apache.hama.matrix.algebra.BlockMultiplyMap;
+import org.apache.hama.matrix.algebra.BlockMultiplyReduce;
+import org.apache.hama.matrix.algebra.DenseMatrixVectorMultMap;
+import org.apache.hama.matrix.algebra.DenseMatrixVectorMultReduce;
+import org.apache.hama.matrix.algebra.JacobiEigenValue;
+import org.apache.hama.matrix.algebra.RowCyclicAdditionMap;
+import org.apache.hama.matrix.algebra.RowCyclicAdditionReduce;
+import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
+import org.apache.hama.util.RandomVariable;
+
+/**
+ * This class represents a dense matrix.
+ */
+public class DenseMatrix extends AbstractMatrix implements Matrix {
+ static private final String TABLE_PREFIX = DenseMatrix.class.getSimpleName();
+ static private final Path TMP_DIR = new Path(DenseMatrix.class
+ .getSimpleName()
+ + "_TMP_dir");
+
+ /**
+ * Construct a raw matrix. Just create a table in HBase.
+ *
+ * @param conf configuration object
+ * @param m the number of rows.
+ * @param n the number of columns.
+ * @throws IOException throw the exception to let the user know what happend,
+ * if we didn't create the matrix successfully.
+ */
+ public DenseMatrix(HamaConfiguration conf, int m, int n) throws IOException {
+ setConfiguration(conf);
+
+ tryToCreateTable(TABLE_PREFIX);
+ closed = false;
+ this.setDimension(m, n);
+ }
+
+ /**
+ * Create/load a matrix aliased as 'matrixName'.
+ *
+ * @param conf configuration object
+ * @param matrixName the name of the matrix
+ * @param force if force is true, a new matrix will be created no matter
+ * 'matrixName' has aliased to an existed matrix; otherwise,
+ * just try to load an existed matrix alised 'matrixName'.
+ * @throws IOException
+ */
+ public DenseMatrix(HamaConfiguration conf, String matrixName, boolean force)
+ throws IOException {
+ setConfiguration(conf);
+ // if force is set to true:
+ // 1) if this matrixName has aliase to other matrix, we will remove
+ // the old aliase, create a new matrix table, and aliase to it.
+
+ // 2) if this matrixName has no aliase to other matrix, we will create
+ // a new matrix table, and alise to it.
+ //
+ // if force is set to false, we just try to load an existed matrix alised
+ // as 'matrixname'.
+
+ boolean existed = hamaAdmin.matrixExists(matrixName);
+
+ if (force) {
+ if (existed) {
+ // remove the old aliase
+ hamaAdmin.delete(matrixName);
+ }
+ // create a new matrix table.
+ tryToCreateTable(TABLE_PREFIX);
+ // save the new aliase relationship
+ save(matrixName);
+ } else {
+ if (existed) {
+ // try to get the actual path of the table
+ matrixPath = hamaAdmin.getPath(matrixName);
+ // load the matrix
+ table = new HTable(conf, matrixPath);
+ // increment the reference
+ incrementAndGetRef();
+ } else {
+ throw new IOException("Try to load non-existed matrix alised as "
+ + matrixName);
+ }
+ }
+
+ closed = false;
+ }
+
+ /**
+ * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
+ * It is an internal used for map/reduce.
+ *
+ * @param conf configuration object
+ * @param matrixpath
+ * @throws IOException
+ * @throws IOException
+ */
+ public DenseMatrix(HamaConfiguration conf, String matrixpath)
+ throws IOException {
+ setConfiguration(conf);
+ matrixPath = matrixpath;
+ // load the matrix
+ table = new HTable(conf, matrixPath);
+ // TODO: now we don't increment the reference of the table
+ // for it's an internal use for map/reduce.
+ // if we want to increment the reference of the table,
+ // we don't know where to call Matrix.close in Add & Mul map/reduce
+ // process to decrement the reference. It seems difficulty.
+ }
+
+ /**
+ * Create an m-by-n constant matrix.
+ *
+ * @param conf configuration object
+ * @param m the number of rows.
+ * @param n the number of columns.
+ * @param s fill the matrix with this scalar value.
+ * @throws IOException throw the exception to let the user know what happend,
+ * if we didn't create the matrix successfully.
+ */
+ public DenseMatrix(HamaConfiguration conf, int m, int n, double s)
+ throws IOException {
+ setConfiguration(conf);
+
+ tryToCreateTable(TABLE_PREFIX);
+
+ closed = false;
+
+ for (int i = 0; i < m; i++) {
+ for (int j = 0; j < n; j++) {
+ set(i, j, s);
+ }
+ }
+
+ setDimension(m, n);
+ }
+
+ /**
+ * Generate matrix with random elements
+ *
+ * @param conf configuration object
+ * @param m the number of rows.
+ * @param n the number of columns.
+ * @return an m-by-n matrix with uniformly distributed random elements.
+ * @throws IOException
+ */
+ public static DenseMatrix random(HamaConfiguration conf, int m, int n)
+ throws IOException {
+ DenseMatrix rand = new DenseMatrix(conf, m, n);
+ DenseVector vector = new DenseVector();
+ LOG.info("Create the " + m + " * " + n + " random matrix : "
+ + rand.getPath());
+
+ for (int i = 0; i < m; i++) {
+ vector.clear();
+ for (int j = 0; j < n; j++) {
+ vector.set(j, RandomVariable.rand());
+ }
+ rand.setRow(i, vector);
+ }
+
+ return rand;
+ }
+
+ /**
+ * Generate matrix with random elements using Map/Reduce
+ *
+ * @param conf configuration object
+ * @param m the number of rows.
+ * @param n the number of columns.
+ * @return an m-by-n matrix with uniformly distributed random elements.
+ * @throws IOException
+ */
+ public static DenseMatrix random_mapred(HamaConfiguration conf, int m, int n)
+ throws IOException {
+ DenseMatrix rand = new DenseMatrix(conf, m, n);
+ LOG.info("Create the " + m + " * " + n + " random matrix : "
+ + rand.getPath());
+
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setJobName("random matrix MR job : " + rand.getPath());
+
+ jobConf.setNumMapTasks(conf.getNumMapTasks());
+ jobConf.setNumReduceTasks(conf.getNumReduceTasks());
+
+ final Path inDir = new Path(TMP_DIR, "in");
+ FileInputFormat.setInputPaths(jobConf, inDir);
+ jobConf.setMapperClass(RandomMatrixMap.class);
+ jobConf.setMapOutputKeyClass(IntWritable.class);
+ jobConf.setMapOutputValueClass(MapWritable.class);
+
+ RandomMatrixReduce.initJob(rand.getPath(), RandomMatrixReduce.class,
+ jobConf);
+ jobConf.setSpeculativeExecution(false);
+ jobConf.setInt("matrix.column", n);
+ jobConf.set("matrix.type", TABLE_PREFIX);
+ jobConf.set("matrix.density", "100");
+
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ final FileSystem fs = FileSystem.get(jobConf);
+ int interval = m / conf.getNumMapTasks();
+
+ // generate an input file for each map task
+ for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+ final Path file = new Path(inDir, "part" + i);
+ final IntWritable start = new IntWritable(i * interval);
+ IntWritable end = null;
+ if ((i + 1) != conf.getNumMapTasks()) {
+ end = new IntWritable(((i * interval) + interval) - 1);
+ } else {
+ end = new IntWritable(m - 1);
+ }
+ final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf,
+ file, IntWritable.class, IntWritable.class, CompressionType.NONE);
+ try {
+ writer.append(start, end);
+ } finally {
+ writer.close();
+ }
+ System.out.println("Wrote input for Map #" + i);
+ }
+
+ JobClient.runJob(jobConf);
+ fs.delete(TMP_DIR, true);
+ return rand;
+ }
+
+ /**
+ * Generate identity matrix
+ *
+ * @param conf configuration object
+ * @param m the number of rows.
+ * @param n the number of columns.
+ * @return an m-by-n matrix with ones on the diagonal and zeros elsewhere.
+ * @throws IOException
+ */
+ public static DenseMatrix identity(HamaConfiguration conf, int m, int n)
+ throws IOException {
+ DenseMatrix identity = new DenseMatrix(conf, m, n);
+ LOG.info("Create the " + m + " * " + n + " identity matrix : "
+ + identity.getPath());
+
+ for (int i = 0; i < m; i++) {
+ DenseVector vector = new DenseVector();
+ for (int j = 0; j < n; j++) {
+ vector.set(j, (i == j ? 1.0 : 0.0));
+ }
+ identity.setRow(i, vector);
+ }
+
+ return identity;
+ }
+
+ /**
+ * Gets the double value of (i, j)
+ *
+ * @param i ith row of the matrix
+ * @param j jth column of the matrix
+ * @return the value of entry, or zero If entry is null
+ * @throws IOException
+ */
+ public double get(int i, int j) throws IOException {
+ if (this.getRows() < i || this.getColumns() < j)
+ throw new ArrayIndexOutOfBoundsException(i + ", " + j);
+
+ Cell c = table.get(BytesUtil.getRowIndex(i), BytesUtil.getColumnIndex(j));
+ if (c == null)
+ throw new NullPointerException("Unexpected null");
+
+ return BytesUtil.bytesToDouble(c.getValue());
+ }
+
+ /**
+ * Gets the vector of row
+ *
+ * @param i the row index of the matrix
+ * @return the vector of row
+ * @throws IOException
+ */
+ public DenseVector getRow(int i) throws IOException {
+ return new DenseVector(table.getRow(BytesUtil.getRowIndex(i),
+ new byte[][] { Bytes.toBytes(Constants.COLUMN) }));
+ }
+
+ /**
+ * Gets the vector of column
+ *
+ * @param j the column index of the matrix
+ * @return the vector of column
+ * @throws IOException
+ */
+ public DenseVector getColumn(int j) throws IOException {
+ byte[] columnKey = BytesUtil.getColumnIndex(j);
+ byte[][] c = { columnKey };
+ Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
+
+ MapWritable trunk = new MapWritable();
+
+ for (RowResult row : scan) {
+ trunk.put(new IntWritable(BytesUtil.bytesToInt(row.getRow())),
+ new DoubleEntry(row.get(columnKey)));
+ }
+
+ return new DenseVector(trunk);
+ }
+
+ /** {@inheritDoc} */
+ public void set(int i, int j, double value) throws IOException {
+ if (this.getRows() < i || this.getColumns() < j)
+ throw new ArrayIndexOutOfBoundsException(i + ", " + j);
+ VectorUpdate update = new VectorUpdate(i);
+ update.put(j, value);
+ table.commit(update.getBatchUpdate());
+ }
+
+ /**
+ * Set the row of a matrix to a given vector
+ *
+ * @param row
+ * @param vector
+ * @throws IOException
+ */
+ public void setRow(int row, Vector vector) throws IOException {
+ if (this.getRows() < row || this.getColumns() < vector.size())
+ throw new ArrayIndexOutOfBoundsException(row);
+
+ VectorUpdate update = new VectorUpdate(row);
+ update.putAll(vector.getEntries());
+ table.commit(update.getBatchUpdate());
+ }
+
+ /**
+ * Set the column of a matrix to a given vector
+ *
+ * @param column
+ * @param vector
+ * @throws IOException
+ */
+ public void setColumn(int column, Vector vector) throws IOException {
+ if (this.getColumns() < column || this.getRows() < vector.size())
+ throw new ArrayIndexOutOfBoundsException(column);
+
+ for (Map.Entry<Writable, Writable> e : vector.getEntries().entrySet()) {
+ int key = ((IntWritable) e.getKey()).get();
+ double value = ((DoubleEntry) e.getValue()).getValue();
+ VectorUpdate update = new VectorUpdate(key);
+ update.put(column, value);
+ table.commit(update.getBatchUpdate());
+ }
+ }
+
+ /**
+ * C = alpha*B + A
+ *
+ * @param alpha
+ * @param B
+ * @return C
+ * @throws IOException
+ */
+ public DenseMatrix add(double alpha, Matrix B) throws IOException {
+ ensureForAddition(B);
+
+ DenseMatrix result = new DenseMatrix(config, this.getRows(), this
+ .getColumns());
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("addition MR job" + result.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(), Double
+ .toString(alpha), RowCyclicAdditionMap.class, IntWritable.class,
+ MapWritable.class, jobConf);
+ RowCyclicAdditionReduce.initJob(result.getPath(),
+ RowCyclicAdditionReduce.class, jobConf);
+
+ JobManager.execute(jobConf);
+ return result;
+ }
+
+ /**
+ * C = B + A
+ *
+ * @param B
+ * @return C
+ * @throws IOException
+ */
+ public DenseMatrix add(Matrix B) throws IOException {
+ return add(1.0, B);
+ }
+
+ public DenseMatrix add(Matrix... matrices) throws IOException {
+ // ensure all the matrices are suitable for addition.
+ for (Matrix m : matrices) {
+ ensureForAddition(m);
+ }
+
+ DenseMatrix result = new DenseMatrix(config, this.getRows(), this
+ .getColumns());
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("addition MR job" + result.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ StringBuilder summandList = new StringBuilder();
+ StringBuilder alphaList = new StringBuilder();
+ for (Matrix m : matrices) {
+ summandList.append(m.getPath());
+ summandList.append(",");
+ alphaList.append("1");
+ alphaList.append(",");
+ }
+ summandList.deleteCharAt(summandList.length() - 1);
+ alphaList.deleteCharAt(alphaList.length() - 1);
+
+ RowCyclicAdditionMap.initJob(this.getPath(), summandList.toString(),
+ alphaList.toString(), RowCyclicAdditionMap.class, IntWritable.class,
+ MapWritable.class, jobConf);
+ RowCyclicAdditionReduce.initJob(result.getPath(),
+ RowCyclicAdditionReduce.class, jobConf);
+
+ JobManager.execute(jobConf);
+ return result;
+ }
+
+ private void ensureForAddition(Matrix m) throws IOException {
+ if (getRows() != m.getRows() || getColumns() != m.getColumns()) {
+ throw new IOException(
+ "Matrices' rows and columns should be same while A+B.");
+ }
+ }
+
+ /**
+ * C = A*B using iterative method
+ *
+ * @param B
+ * @return C
+ * @throws IOException
+ */
+ public DenseMatrix mult(Matrix B) throws IOException {
+ ensureForMultiplication(B);
+ int columns = 0;
+ if(B.getColumns() == 1 || this.getColumns() == 1)
+ columns = 1;
+ else
+ columns = this.getColumns();
+
+ DenseMatrix result = new DenseMatrix(config, this.getRows(), columns);
+
+ for (int i = 0; i < this.getRows(); i++) {
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("multiplication MR job : " + result.getPath() + " "
+ + i);
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ DenseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(),
+ DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
+ jobConf);
+ DenseMatrixVectorMultReduce.initJob(result.getPath(),
+ DenseMatrixVectorMultReduce.class, jobConf);
+ JobManager.execute(jobConf);
+ }
+
+ return result;
+ }
+
+ /**
+ * C = A * B using Blocking algorithm
+ *
+ * @param B
+ * @param blocks the number of blocks
+ * @return C
+ * @throws IOException
+ */
+ public DenseMatrix mult(Matrix B, int blocks) throws IOException {
+ ensureForMultiplication(B);
+
+ String collectionTable = "collect_" + RandomVariable.randMatrixPath();
+ HTableDescriptor desc = new HTableDescriptor(collectionTable);
+ desc.addFamily(new HColumnDescriptor(Bytes.toBytes(Constants.BLOCK)));
+ this.admin.createTable(desc);
+ LOG.info("Collect Blocks");
+
+ collectBlocksMapRed(this.getPath(), collectionTable, blocks, true);
+ collectBlocksMapRed(B.getPath(), collectionTable, blocks, false);
+
+ DenseMatrix result = new DenseMatrix(config, this.getRows(), this
+ .getColumns());
+
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("multiplication MR job : " + result.getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+ BlockMultiplyMap.initJob(collectionTable, BlockMultiplyMap.class,
+ BlockID.class, BlockWritable.class, jobConf);
+ BlockMultiplyReduce.initJob(result.getPath(), BlockMultiplyReduce.class,
+ jobConf);
+
+ JobManager.execute(jobConf);
+ hamaAdmin.delete(collectionTable);
+ return result;
+ }
+
+ private void ensureForMultiplication(Matrix m) throws IOException {
+ if (getColumns() != m.getRows()) {
+ throw new IOException("A's columns should equal with B's rows while A*B.");
+ }
+ }
+
+ /**
+ * C = alpha*A*B + C
+ *
+ * @param alpha
+ * @param B
+ * @param C
+ * @return C
+ * @throws IOException
+ */
+ public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * Computes the given norm of the matrix
+ *
+ * @param type
+ * @return norm of the matrix
+ * @throws IOException
+ */
+ public double norm(Norm type) throws IOException {
+ if (type == Norm.One)
+ return getNorm1();
+ else if (type == Norm.Frobenius)
+ return getFrobenius();
+ else if (type == Norm.Infinity)
+ return getInfinity();
+ else
+ return getMaxvalue();
+ }
+
+ /**
+ * Returns type of matrix
+ */
+ public String getType() {
+ return this.getClass().getSimpleName();
+ }
+
+ /**
+ * Returns the sub matrix formed by selecting certain rows and columns from a
+ * bigger matrix. The sub matrix is a in-memory operation only.
+ *
+ * @param i0 the start index of row
+ * @param i1 the end index of row
+ * @param j0 the start index of column
+ * @param j1 the end index of column
+ * @return the sub matrix of matrix
+ * @throws IOException
+ */
+ public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
+ int columnSize = (j1 - j0) + 1;
+ SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
+
+ byte[][] cols = new byte[columnSize][];
+ for (int j = j0, jj = 0; j <= j1; j++, jj++) {
+ cols[jj] = BytesUtil.getColumnIndex(j);
+ }
+
+ Scanner scan = table.getScanner(cols, BytesUtil.getRowIndex(i0), BytesUtil
+ .getRowIndex(i1 + 1));
+ Iterator<RowResult> it = scan.iterator();
+ int i = 0;
+ RowResult rs = null;
+ while (it.hasNext()) {
+ rs = it.next();
+ for (int j = j0, jj = 0; j <= j1; j++, jj++) {
+ result.set(i, jj, rs.get(BytesUtil.getColumnIndex(j)).getValue());
+ }
+ i++;
+ }
+
+ scan.close();
+ return result;
+ }
+
+ /**
+ * Collect Blocks
+ *
+ * @param path a input path
+ * @param collectionTable the collection table
+ * @param blockNum the number of blocks
+ * @param bool
+ * @throws IOException
+ */
+ public void collectBlocksMapRed(String path, String collectionTable,
+ int blockNum, boolean bool) throws IOException {
+ double blocks = Math.pow(blockNum, 0.5);
+ if (!String.valueOf(blocks).endsWith(".0"))
+ throw new IOException("can't divide.");
+
+ int block_size = (int) blocks;
+ JobConf jobConf = new JobConf(config);
+ jobConf.setJobName("Blocking MR job" + getPath());
+
+ jobConf.setNumMapTasks(config.getNumMapTasks());
+ jobConf.setNumReduceTasks(config.getNumReduceTasks());
+ jobConf.setMapperClass(CollectBlocksMapper.class);
+ jobConf.setInputFormat(VectorInputFormat.class);
+ jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+
+ FileInputFormat.addInputPaths(jobConf, path);
+
+ CollectBlocksMapper.initJob(collectionTable, bool, block_size, this
+ .getRows(), this.getColumns(), jobConf);
+
+ JobManager.execute(jobConf);
+ }
+
+ /**
+ * Compute all the eigen values. Note: all the eigen values are collected in
+ * the "eival:value" column, and the eigen vector of a specified eigen value
+ * is collected in the "eivec:" column family in the same row.
+ *
+ * TODO: we may need to expose the interface to access the eigen values and
+ * vectors
+ *
+ * @param loops limit the loops of the computation
+ * @throws IOException
+ */
+ public void jacobiEigenValue(int loops) throws IOException {
+ JobConf jobConf = new JobConf(config);
+
+ /***************************************************************************
+ * Initialization
+ *
+ * A M/R job is used for initialization(such as, preparing a matrx copy of
+ * the original in "eicol:" family.)
+ **************************************************************************/
+ // initialization
+ jobConf.setJobName("JacobiEigen initialization MR job" + getPath());
+
+ jobConf.setMapperClass(JacobiEigenValue.InitMapper.class);
+ jobConf.setInputFormat(VectorInputFormat.class);
+ jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.set(JacobiEigenValue.MATRIX, getPath());
+ jobConf.setOutputFormat(NullOutputFormat.class);
+ jobConf.setMapOutputKeyClass(IntWritable.class);
+ jobConf.setMapOutputValueClass(MapWritable.class);
+
+ JobManager.execute(jobConf);
+
+ final FileSystem fs = FileSystem.get(jobConf);
+ Pair pivotPair = new Pair();
+ DoubleWritable pivotWritable = new DoubleWritable();
+ VectorUpdate vu;
+
+ // loop
+ int size = this.getRows();
+ int state = size;
+ int pivot_row, pivot_col;
+ double pivot;
+ double s, c, t, y;
+
+ while (state != 0 && loops > 0) {
+ /*************************************************************************
+ * Find the pivot and its index(pivot_row, pivot_col)
+ *
+ * A M/R job is used to scan all the "eival:ind" to get the max absolute
+ * value of each row, and do a MAX aggregation of these max values to get
+ * the max value in the matrix.
+ ************************************************************************/
+ jobConf = new JobConf(config);
+ jobConf.setJobName("Find Pivot MR job" + getPath());
+
+ jobConf.setNumReduceTasks(1);
+
+ Path outDir = new Path(new Path(getType() + "_TMP_FindPivot_dir_"
+ + System.currentTimeMillis()), "out");
+ if (fs.exists(outDir))
+ fs.delete(outDir, true);
+
+ jobConf.setMapperClass(JacobiEigenValue.PivotMapper.class);
+ jobConf.setInputFormat(JacobiEigenValue.PivotInputFormat.class);
+ jobConf.set(JacobiEigenValue.PivotInputFormat.COLUMN_LIST,
+ JacobiEigenValue.EIIND);
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.setMapOutputKeyClass(Pair.class);
+ jobConf.setMapOutputValueClass(DoubleWritable.class);
+
+ jobConf.setOutputKeyClass(Pair.class);
+ jobConf.setOutputValueClass(DoubleWritable.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(jobConf, outDir);
+
+ // update the out put dir of the job
+ outDir = FileOutputFormat.getOutputPath(jobConf);
+
+ JobManager.execute(jobConf);
+
+ // read outputs
+ Path inFile = new Path(outDir, "part-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ try {
+ reader.next(pivotPair, pivotWritable);
+ pivot_row = pivotPair.getRow();
+ pivot_col = pivotPair.getColumn();
+ pivot = pivotWritable.get();
+ } finally {
+ reader.close();
+ }
+ fs.delete(outDir.getParent(), true);
+
+ /*************************************************************************
+ * Calculation
+ *
+ * Compute the rotation parameters of next rotation.
+ ************************************************************************/
+ double e1 = BytesUtil.bytesToDouble(table.get(
+ BytesUtil.getRowIndex(pivot_row),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+ double e2 = BytesUtil.bytesToDouble(table.get(
+ BytesUtil.getRowIndex(pivot_col),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+
+ y = (e2 - e1) / 2;
+ t = Math.abs(y) + Math.sqrt(pivot * pivot + y * y);
+ s = Math.sqrt(pivot * pivot + t * t);
+ c = t / s;
+ s = pivot / s;
+ t = (pivot * pivot) / t;
+ if (y < 0) {
+ s = -s;
+ t = -t;
+ }
+
+ /*************************************************************************
+ * Upate the pivot and the eigen values indexed by the pivot
+ ************************************************************************/
+ vu = new VectorUpdate(pivot_row);
+ vu.put(JacobiEigenValue.EICOL, pivot_col, 0);
+ table.commit(vu.getBatchUpdate());
+
+ state = update(pivot_row, -t, state);
+ state = update(pivot_col, t, state);
+
+ /*************************************************************************
+ * Rotation the matrix
+ ************************************************************************/
+ // rotation
+ jobConf = new JobConf(config);
+ jobConf.setJobName("Rotation Matrix MR job" + getPath());
+
+ jobConf.setInt(JacobiEigenValue.PIVOTROW, pivot_row);
+ jobConf.setInt(JacobiEigenValue.PIVOTCOL, pivot_col);
+ jobConf.set(JacobiEigenValue.PIVOTSIN, String.valueOf(s));
+ jobConf.set(JacobiEigenValue.PIVOTCOS, String.valueOf(c));
+
+ jobConf.setMapperClass(DummyMapper.class);
+ jobConf.setInputFormat(JacobiEigenValue.RotationInputFormat.class);
+ jobConf.set(JacobiEigenValue.RotationInputFormat.COLUMN_LIST,
+ JacobiEigenValue.EIIND);
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.setMapOutputKeyClass(NullWritable.class);
+ jobConf.setMapOutputValueClass(NullWritable.class);
+ FileInputFormat.addInputPaths(jobConf, getPath());
+ jobConf.setOutputFormat(NullOutputFormat.class);
+
+ JobManager.execute(jobConf);
+
+ // rotate eigenvectors
+ LOG.info("rotating eigenvector");
+ for (int i = 0; i < size; i++) {
+ e1 = BytesUtil.bytesToDouble(table.get(
+ BytesUtil.getRowIndex(pivot_row),
+ Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+ e2 = BytesUtil.bytesToDouble(table.get(
+ BytesUtil.getRowIndex(pivot_col),
+ Bytes.toBytes(JacobiEigenValue.EIVEC + i)).getValue());
+
+ vu = new VectorUpdate(pivot_row);
+ vu.put(JacobiEigenValue.EIVEC, i, c * e1 - s * e2);
+ table.commit(vu.getBatchUpdate());
+
+ vu = new VectorUpdate(pivot_col);
+ vu.put(JacobiEigenValue.EIVEC, i, s * e1 + c * e2);
+ table.commit(vu.getBatchUpdate());
+ }
+
+ LOG.info("update index...");
+ // update index array
+ maxind(pivot_row, size);
+ maxind(pivot_col, size);
+
+ loops--;
+ }
+ }
+
+ void maxind(int row, int size) throws IOException {
+ int m = row + 1;
+ if (row + 2 < size) {
+ double max = BytesUtil.bytesToDouble(table
+ .get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EICOL + m)).getValue());
+ double val;
+ for (int i = row + 2; i < size; i++) {
+ val = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EICOL + i)).getValue());
+ if (Math.abs(val) > Math.abs(max)) {
+ m = i;
+ max = val;
+ }
+ }
+ }
+
+ VectorUpdate vu = new VectorUpdate(row);
+ vu.put(JacobiEigenValue.EIIND, m);
+ table.commit(vu.getBatchUpdate());
+ }
+
+ int update(int row, double value, int state) throws IOException {
+ double e = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+ int changed = BytesUtil.bytesToInt(table.get(BytesUtil.getRowIndex(row),
+ Bytes.toBytes(JacobiEigenValue.EICHANGED)).getValue());
+ double y = e;
+ e += value;
+
+ VectorUpdate vu = new VectorUpdate(row);
+ vu.put(JacobiEigenValue.EIVAL, e);
+ if (changed == 1 && (Math.abs(y - e) < .0000001)) { // y == e) {
+ changed = 0;
+ vu.put(JacobiEigenValue.EICHANGED, changed);
+ state--;
+ } else if (changed == 0 && (Math.abs(y - e) > .0000001)) {
+ changed = 1;
+ vu.put(JacobiEigenValue.EICHANGED, changed);
+ state++;
+ }
+ table.commit(vu.getBatchUpdate());
+ return state;
+ }
+
+ // for test
+ boolean verifyEigenValue(double[] e, double[][] E) throws IOException {
+ boolean success = true;
+ double e1, ev;
+ for (int i = 0; i < e.length; i++) {
+ e1 = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
+ Bytes.toBytes(JacobiEigenValue.EIVAL)).getValue());
+ success &= ((Math.abs(e1 - e[i]) < .0000001));
+ if (!success)
+ return success;
+
+ for (int j = 0; j < E[i].length; j++) {
+ ev = BytesUtil.bytesToDouble(table.get(BytesUtil.getRowIndex(i),
+ Bytes.toBytes(JacobiEigenValue.EIVEC + j)).getValue());
+ success &= ((Math.abs(ev - E[i][j]) < .0000001));
+ if (!success)
+ return success;
+ }
+ }
+ return success;
+ }
+}
Added: incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java?rev=817156&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/matrix/DenseVector.java Mon Sep 21 04:40:10 2009
@@ -0,0 +1,312 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.matrix;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.matrix.Vector.Norm;
+import org.apache.log4j.Logger;
+
+/**
+ * This class represents a dense vector.
+ */
+public class DenseVector extends AbstractVector implements Vector {
+ static final Logger LOG = Logger.getLogger(DenseVector.class);
+
+ public DenseVector() {
+ this(new MapWritable());
+ }
+
+ public DenseVector(MapWritable m) {
+ this.entries = m;
+ }
+
+ public DenseVector(RowResult row) {
+ this.initMap(row);
+ }
+
+ public DenseVector(int row, MapWritable m) {
+ this.entries = m;
+ this.entries.put(Constants.ROWCOUNT, new IntWritable(row));
+ }
+
+ /**
+ * Sets the value of index
+ *
+ * @param index
+ * @param value
+ */
+ public void set(int index, double value) {
+ // If entries are null, create new object
+ if (this.entries == null) {
+ this.entries = new MapWritable();
+ }
+
+ this.entries.put(new IntWritable(index), new DoubleEntry(value));
+ }
+
+ /**
+ * Sets the vector
+ *
+ * @param v
+ * @return x = v
+ */
+ public DenseVector set(Vector v) {
+ this.set(1, v);
+ return this;
+ }
+
+ public Vector set(double alpha, Vector v) {
+ checkComformantSize(v);
+ boolean zeroFill = false;
+ if(alpha == 0)
+ zeroFill = true;
+
+ for (Map.Entry<Writable, Writable> e : v.getEntries().entrySet()) {
+ int key = ((IntWritable) e.getKey()).get();
+ if(zeroFill)
+ this.set(key, 0);
+ else
+ this.set(key, alpha * ((DoubleEntry) e.getValue()).getValue());
+ }
+
+ return this;
+ }
+
+ public void setRow(int row) {
+ this.entries.put(Constants.ROWCOUNT, new IntWritable(row));
+ }
+
+ /**
+ * Gets the value of index
+ *
+ * @param index
+ * @return the value of v(index)
+ * @throws IOException
+ */
+ public double get(int index) {
+ double value;
+ try {
+ value = ((DoubleEntry) this.entries.get(new IntWritable(index)))
+ .getValue();
+ } catch (NullPointerException e) {
+ throw new NullPointerException("Unexpected null value : " + e.toString());
+ }
+
+ return value;
+ }
+
+ public int getRow() {
+ return ((IntWritable) this.entries.get(Constants.ROWCOUNT)).get();
+ }
+
+ /**
+ * Adds the value to v(index)
+ *
+ * @param index
+ * @param value
+ */
+ public void add(int index, double value) {
+ set(index, get(index) + value);
+ }
+
+ /**
+ * x = alpha*v + x
+ *
+ * @param alpha
+ * @param v
+ * @return x = alpha*v + x
+ */
+ public DenseVector add(double alpha, Vector v) {
+ checkComformantSize(v);
+ if (alpha == 0)
+ return this;
+
+ for (Map.Entry<Writable, Writable> e : this.getEntries().entrySet()) {
+ int key = ((IntWritable) e.getKey()).get();
+ this.add(key, alpha * v.get(key));
+ }
+
+ return this;
+ }
+
+ /**
+ * x = v + x
+ *
+ * @param v2
+ * @return x = v + x
+ */
+ public DenseVector add(Vector v2) {
+ checkComformantSize(v2);
+
+ for (Map.Entry<Writable, Writable> e : this.getEntries().entrySet()) {
+ int key = ((IntWritable) e.getKey()).get();
+ this.add(key, v2.get(key));
+ }
+
+ return this;
+ }
+
+ /**
+ * x dot v
+ *
+ * @param v
+ * @return x dot v
+ */
+ public double dot(Vector v) {
+ checkComformantSize(v);
+
+ double cosine = 0.0;
+ double q_i, d_i;
+ for (int i = 0; i < Math.min(this.size(), v.size()); i++) {
+ q_i = v.get(i);
+ d_i = this.get(i);
+ cosine += q_i * d_i;
+ }
+ return cosine / (this.getNorm2() * ((DenseVector) v).getNorm2());
+ }
+
+ /**
+ * v = alpha*v
+ *
+ * @param alpha
+ * @return v = alpha*v
+ */
+ public DenseVector scale(double alpha) {
+ for (Map.Entry<Writable, Writable> e : this.entries.entrySet()) {
+ this.entries.put(e.getKey(), new DoubleEntry(((DoubleEntry) e.getValue())
+ .getValue()
+ * alpha));
+ }
+ return this;
+ }
+
+ /**
+ * Computes the given norm of the vector
+ *
+ * @param type
+ * @return norm of the vector
+ */
+ public double norm(Norm type) {
+ if (type == Norm.One)
+ return getNorm1();
+ else if (type == Norm.Two)
+ return getNorm2();
+ else if (type == Norm.TwoRobust)
+ return getNorm2Robust();
+ else
+ return getNormInf();
+ }
+
+ protected double getNorm1() {
+ double sum = 0.0;
+
+ Set<Writable> keySet = this.entries.keySet();
+ Iterator<Writable> it = keySet.iterator();
+
+ while (it.hasNext()) {
+ sum += get(((IntWritable) it.next()).get());
+ }
+
+ return sum;
+ }
+
+ protected double getNorm2() {
+ double square_sum = 0.0;
+
+ Set<Writable> keySet = entries.keySet();
+ Iterator<Writable> it = keySet.iterator();
+
+ while (it.hasNext()) {
+ double value = get(((IntWritable) it.next()).get());
+ square_sum += value * value;
+ }
+
+ return Math.sqrt(square_sum);
+ }
+
+ /**
+ * Returns the robust norm of the vector
+ *
+ * @return the robust norm of the vector
+ */
+ protected double getNorm2Robust() {
+ double scale = 0, ssq = 1;
+ for (int i = 0; i < this.size(); i++) {
+ double val = get(i);
+ if (val != 0) {
+ double absxi = Math.abs(val);
+ if (scale < absxi) {
+ ssq = 1 + ssq * Math.pow(scale / absxi, 2);
+ scale = absxi;
+ } else
+ ssq = ssq + Math.pow(absxi / scale, 2);
+ }
+ }
+ return scale * Math.sqrt(ssq);
+ }
+
+ /**
+ * Returns the infinity norm of the vector
+ *
+ * @return the infinity norm of the vector
+ */
+ protected double getNormInf() {
+ double max = 0.0;
+ for (int i = 0; i < this.size(); i++) {
+ max = Math.max(max, Math.abs(get(i)));
+ }
+ return max;
+ }
+
+ /**
+ * Returns a sub-vector.
+ *
+ * @param i0 the index of the first element
+ * @param i1 the index of the last element
+ * @return v[i0:i1]
+ */
+ public DenseVector subVector(int i0, int i1) {
+ DenseVector res = new DenseVector();
+ if (this.entries.containsKey(Constants.ROWCOUNT))
+ res.setRow(this.getRow());
+
+ for (int i = i0; i <= i1; i++) {
+ res.set(i, get(i));
+ }
+
+ return res;
+ }
+
+ public void zeroFill(int size) {
+ for(int i = 0; i < size; i++) {
+ this.set(i, 0);
+ }
+ }
+}