You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2009/02/16 05:35:24 UTC
svn commit: r744798 [2/2] - in /incubator/hama/trunk: ./
src/examples/org/apache/hama/examples/ src/java/org/apache/hama/
src/java/org/apache/hama/algebra/ src/java/org/apache/hama/io/
src/java/org/apache/hama/mapred/ src/test/org/apache/hama/ src/test...
Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java Mon Feb 16 04:35:23 2009
@@ -1,69 +1,69 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.algebra;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.VectorOutputFormat;
-import org.apache.log4j.Logger;
-
-public class SIMDMultiplyReduce extends MapReduceBase implements
- Reducer<IntWritable, VectorWritable, IntWritable, VectorUpdate> {
- static final Logger LOG = Logger.getLogger(SIMDMultiplyReduce.class);
-
- /**
- * Use this before submitting a TableReduce job. It will appropriately set up
- * the JobConf.
- *
- * @param table
- * @param reducer
- * @param job
- */
- public static void initJob(String table,
- Class<SIMDMultiplyReduce> reducer, JobConf job) {
- job.setOutputFormat(VectorOutputFormat.class);
- job.setReducerClass(reducer);
- job.set(VectorOutputFormat.OUTPUT_TABLE, table);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(BatchUpdate.class);
- }
-
- @Override
- public void reduce(IntWritable key, Iterator<VectorWritable> values,
- OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
- throws IOException {
-
- VectorUpdate update = new VectorUpdate(key.get());
- update.putAll(values.next().entrySet());
-
- output.collect(key, update);
- }
-
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class SIMDMultiplyReduce extends MapReduceBase implements
+ Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
+ static final Logger LOG = Logger.getLogger(SIMDMultiplyReduce.class);
+
+ /**
+ * Use this before submitting a TableReduce job. It will appropriately set up
+ * the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table,
+ Class<SIMDMultiplyReduce> reducer, JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<MapWritable> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException {
+
+ VectorUpdate update = new VectorUpdate(key.get());
+ update.putAll(values.next());
+
+ output.collect(key, update);
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/VectorUpdate.java Mon Feb 16 04:35:23 2009
@@ -25,6 +25,9 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.util.BytesUtil;
@@ -76,4 +79,10 @@
put(e.getKey(), e.getValue().getValue());
}
}
+
+ public void putAll(MapWritable entries) {
+ for (Map.Entry<Writable, Writable> e : entries.entrySet()) {
+ put(((IntWritable) e.getKey()).get(), ((DoubleEntry) e.getValue()).getValue());
+ }
+ }
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMap.java Mon Feb 16 04:35:23 2009
@@ -16,11 +16,11 @@
*/
package org.apache.hama.mapred;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hama.io.BlockID;
-import org.apache.hama.io.VectorWritable;
/**
* Scan an table/file to collect blocks.
@@ -30,5 +30,5 @@
*/
@SuppressWarnings("unchecked")
public interface CollectBlocksMap<K extends WritableComparable, V extends Writable>
- extends Mapper<K, V, BlockID, VectorWritable> {
+ extends Mapper<K, V, BlockID, MapWritable> {
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapReduceBase.java Mon Feb 16 04:35:23 2009
@@ -16,11 +16,11 @@
*/
package org.apache.hama.mapred;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
-import org.apache.hama.io.VectorWritable;
/**
* Abstract Blocking Map/Reduce Class to configure the job.
@@ -59,7 +59,7 @@
int block_size, int i, int j, JobConf job) {
job.setReducerClass(CollectBlocksReducer.class);
job.setMapOutputKeyClass(BlockID.class);
- job.setMapOutputValueClass(VectorWritable.class);
+ job.setMapOutputValueClass(MapWritable.class);
job.setOutputFormat(BlockOutputFormat.class);
job.setOutputKeyClass(BlockID.class);
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksMapper.java Mon Feb 16 04:35:23 2009
@@ -1,56 +1,54 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.DenseVector;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.io.VectorWritable;
-
-/**
- * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
- */
-public class CollectBlocksMapper extends CollectBlocksMapReduceBase implements
- CollectBlocksMap<IntWritable, VectorWritable> {
-
- @Override
- public void map(IntWritable key, VectorWritable value,
- OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
- throws IOException {
- int startColumn;
- int endColumn;
- int blkRow = key.get() / mBlockRowSize;
- DenseVector dv = value.getDenseVector();
-
- int i = 0;
- do {
- startColumn = i * mBlockColSize;
- endColumn = startColumn + mBlockColSize - 1;
- if (endColumn >= mColumns) // the last sub vector
- endColumn = mColumns - 1;
- output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(), dv
- .subVector(startColumn, endColumn)));
-
- i++;
- } while (endColumn < (mColumns - 1));
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseVector;
+import org.apache.hama.io.BlockID;
+import org.apache.log4j.Logger;
+
+/**
+ * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
+ */
+public class CollectBlocksMapper extends CollectBlocksMapReduceBase implements
+ CollectBlocksMap<IntWritable, MapWritable> {
+ static final Logger LOG = Logger.getLogger(CollectBlocksMapper.class);
+
+ @Override
+ public void map(IntWritable key, MapWritable value,
+ OutputCollector<BlockID, MapWritable> output, Reporter reporter)
+ throws IOException {
+ int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0;
+ DenseVector dv = new DenseVector(key.get(), value);
+
+ do {
+ startColumn = i * mBlockColSize;
+ endColumn = startColumn + mBlockColSize - 1;
+ if (endColumn >= mColumns) // the last sub vector
+ endColumn = mColumns - 1;
+ output.collect(new BlockID(blkRow, i), dv.subVector(startColumn, endColumn).getEntries());
+
+ i++;
+ } while (endColumn < (mColumns - 1));
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/CollectBlocksReducer.java Mon Feb 16 04:35:23 2009
@@ -19,13 +19,16 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseVector;
import org.apache.hama.SubMatrix;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
-import org.apache.hama.io.VectorWritable;
/**
* Rows are named as c(i, j) with sequential number ((N^2 * i) + ((j * N) + k)
@@ -33,10 +36,11 @@
* b(k, j).
*/
public class CollectBlocksReducer extends CollectBlocksMapReduceBase implements
- Reducer<BlockID, VectorWritable, BlockID, BlockWritable> {
-
+ Reducer<BlockID, MapWritable, BlockID, BlockWritable> {
+ static final Log LOG = LogFactory.getLog(CollectBlocksReducer.class);
+
@Override
- public void reduce(BlockID key, Iterator<VectorWritable> values,
+ public void reduce(BlockID key, Iterator<MapWritable> values,
OutputCollector<BlockID, BlockWritable> output, Reporter reporter)
throws IOException {
// Note: all the sub-vectors are grouped by {@link
@@ -60,11 +64,12 @@
// i, j is the current offset in the sub-matrix
int i = 0, j = 0;
while (values.hasNext()) {
- VectorWritable vw = values.next();
+ DenseVector vw = new DenseVector(values.next());
// check the size is suitable
if (vw.size() != smCols)
throw new IOException("Block Column Size dismatched.");
- i = vw.row - rowBase;
+ i = vw.getRow() - rowBase;
+
if (i >= smRows || i < 0)
throw new IOException("Block Row Size dismatched.");
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixMap.java Mon Feb 16 04:35:23 2009
@@ -1,60 +1,60 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.DenseVector;
-import org.apache.hama.io.VectorWritable;
-import org.apache.hama.util.RandomVariable;
-import org.apache.log4j.Logger;
-
-/**
- * Generate matrix with random elements
- */
-public class RandomMatrixMap extends MapReduceBase implements
- Mapper<IntWritable, IntWritable, IntWritable, VectorWritable> {
- static final Logger LOG = Logger.getLogger(RandomMatrixMap.class);
- protected int column;
- protected DenseVector vector = new DenseVector();
-
- @Override
- public void map(IntWritable key, IntWritable value,
- OutputCollector<IntWritable, VectorWritable> output, Reporter report)
- throws IOException {
- vector.clear();
- for (int i = key.get(); i <= value.get(); i++) {
- for (int j = 0; j < column; j++) {
- vector.set(j, RandomVariable.rand());
- }
- output.collect(new IntWritable(i), new VectorWritable(i, vector));
- }
- }
-
- public void configure(JobConf job) {
- column = Integer.parseInt(job.get("matrix.column"));
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.DenseVector;
+import org.apache.hama.util.RandomVariable;
+import org.apache.log4j.Logger;
+
+/**
+ * Generate matrix with random elements
+ */
+public class RandomMatrixMap extends MapReduceBase implements
+ Mapper<IntWritable, IntWritable, IntWritable, MapWritable> {
+ static final Logger LOG = Logger.getLogger(RandomMatrixMap.class);
+ protected int column;
+ protected DenseVector vector = new DenseVector();
+
+ @Override
+ public void map(IntWritable key, IntWritable value,
+ OutputCollector<IntWritable, MapWritable> output, Reporter report)
+ throws IOException {
+ vector.clear();
+ for (int i = key.get(); i <= value.get(); i++) {
+ for (int j = 0; j < column; j++) {
+ vector.set(j, RandomVariable.rand());
+ }
+ output.collect(new IntWritable(i), vector.getEntries());
+ }
+ }
+
+ public void configure(JobConf job) {
+ column = Integer.parseInt(job.get("matrix.column"));
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixReduce.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/RandomMatrixReduce.java Mon Feb 16 04:35:23 2009
@@ -1,69 +1,69 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.io.VectorWritable;
-import org.apache.log4j.Logger;
-
-public class RandomMatrixReduce extends MapReduceBase implements
- Reducer<IntWritable, VectorWritable, IntWritable, VectorUpdate> {
- static final Logger LOG = Logger.getLogger(RandomMatrixReduce.class);
-
- /**
- * Use this before submitting a TableReduce job. It will appropriately set up
- * the JobConf.
- *
- * @param table
- * @param reducer
- * @param job
- */
- public static void initJob(String table, Class<RandomMatrixReduce> reducer,
- JobConf job) {
- job.setOutputFormat(VectorOutputFormat.class);
- job.setReducerClass(reducer);
- job.set(VectorOutputFormat.OUTPUT_TABLE, table);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(BatchUpdate.class);
- }
-
- @Override
- public void reduce(IntWritable key, Iterator<VectorWritable> values,
- OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
- throws IOException {
- VectorUpdate update = new VectorUpdate(key.get());
- update.putAll(values.next().entrySet());
- output.collect(key, update);
-
- if(values.hasNext())
- throw new IOException("Unexpected data.");
- }
-
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.log4j.Logger;
+
+public class RandomMatrixReduce extends MapReduceBase implements
+ Reducer<IntWritable, MapWritable, IntWritable, VectorUpdate> {
+ static final Logger LOG = Logger.getLogger(RandomMatrixReduce.class);
+
+ /**
+ * Use this before submitting a TableReduce job. It will appropriately set up
+ * the JobConf.
+ *
+ * @param table
+ * @param reducer
+ * @param job
+ */
+ public static void initJob(String table, Class<RandomMatrixReduce> reducer,
+ JobConf job) {
+ job.setOutputFormat(VectorOutputFormat.class);
+ job.setReducerClass(reducer);
+ job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(BatchUpdate.class);
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<MapWritable> values,
+ OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+ throws IOException {
+ VectorUpdate update = new VectorUpdate(key.get());
+ update.putAll(values.next());
+ output.collect(key, update);
+
+ if(values.hasNext())
+ throw new IOException("Unexpected data.");
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Mon Feb 16 04:35:23 2009
@@ -1,168 +1,168 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.hbase.mapred.TableSplit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.io.VectorWritable;
-import org.apache.hama.util.BytesUtil;
-
-public class VectorInputFormat extends HTableInputFormatBase implements
- InputFormat<IntWritable, VectorWritable>, JobConfigurable {
- static final Log LOG = LogFactory.getLog(VectorInputFormat.class);
- private TableRecordReader tableRecordReader;
-
- /**
- * Iterate over an HBase table data, return (IntWritable, VectorWritable) pairs
- */
- protected static class TableRecordReader extends HTableRecordReaderBase
- implements RecordReader<IntWritable, VectorWritable> {
-
- private int totalRows;
- private int processedRows;
-
- @Override
- public void init() throws IOException {
- super.init();
- if(endRow.length == 0) { // the last split, we don't know the end row
- totalRows = 0; // so we just skip it.
- } else {
- if(startRow.length == 0) { // the first split, start row is 0
- totalRows = BytesUtil.bytesToInt(endRow);
- } else {
- totalRows = BytesUtil.bytesToInt(endRow) - BytesUtil.bytesToInt(startRow);
- }
- }
- processedRows = 0;
- LOG.info("Split (" + Bytes.toString(startRow) + ", " + Bytes.toString(endRow) +
- ") -> " + totalRows);
- }
-
-
- /**
- * @return IntWritable
- *
- * @see org.apache.hadoop.mapred.RecordReader#createKey()
- */
- public IntWritable createKey() {
- return new IntWritable();
- }
-
- /**
- * @return VectorWritable
- *
- * @see org.apache.hadoop.mapred.RecordReader#createValue()
- */
- public VectorWritable createValue() {
- return new VectorWritable();
- }
-
- /**
- * @param key IntWritable as input key.
- * @param value VectorWritable as input value
- *
- * Converts Scanner.next() to IntWritable, VectorWritable
- *
- * @return true if there was more data
- * @throws IOException
- */
- public boolean next(IntWritable key, VectorWritable value)
- throws IOException {
- RowResult result;
- try {
- result = this.scanner.next();
- } catch (UnknownScannerException e) {
- LOG.debug("recovered from " + StringUtils.stringifyException(e));
- restart(lastRow);
- this.scanner.next(); // skip presumed already mapped row
- result = this.scanner.next();
- }
-
- boolean hasMore = result != null && result.size() > 0;
- if (hasMore) {
- byte[] row = result.getRow();
- key.set(BytesUtil.bytesToInt(row));
- lastRow = row;
- Writables.copyWritable(result, value);
- processedRows++;
- }
- return hasMore;
- }
-
- @Override
- public float getProgress() {
- if(totalRows <= 0) {
- return 0;
- } else {
- return Math.min(1.0f, processedRows / (float)totalRows);
- }
- }
-
- }
-
- /**
- * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
- * default.
- *
- * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
- * JobConf, Reporter)
- */
- public RecordReader<IntWritable, VectorWritable> getRecordReader(
- InputSplit split, JobConf job, Reporter reporter) throws IOException {
- TableSplit tSplit = (TableSplit) split;
- TableRecordReader trr = this.tableRecordReader;
- // if no table record reader was provided use default
- if (trr == null) {
- trr = new TableRecordReader();
- }
- trr.setStartRow(tSplit.getStartRow());
- trr.setEndRow(tSplit.getEndRow());
- trr.setHTable(this.table);
- trr.setInputColumns(this.inputColumns);
- trr.setRowFilter(this.rowFilter);
- trr.init();
- return trr;
- }
-
- /**
- * Allows subclasses to set the {@link TableRecordReader}.
- *
- * @param tableRecordReader to provide other {@link TableRecordReader}
- * implementations.
- */
- protected void setTableRecordReader(TableRecordReader tableRecordReader) {
- this.tableRecordReader = tableRecordReader;
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.DenseVector;
+import org.apache.hama.util.BytesUtil;
+
+public class VectorInputFormat extends HTableInputFormatBase implements
+ InputFormat<IntWritable, MapWritable>, JobConfigurable {
+ static final Log LOG = LogFactory.getLog(VectorInputFormat.class);
+ private TableRecordReader tableRecordReader;
+
+ /**
+ * Iterate over an HBase table data, return (IntWritable, MapWritable) pairs
+ */
+ protected static class TableRecordReader extends HTableRecordReaderBase
+ implements RecordReader<IntWritable, MapWritable> {
+
+ private int totalRows;
+ private int processedRows;
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+ if(endRow.length == 0) { // the last split, we don't know the end row
+ totalRows = 0; // so we just skip it.
+ } else {
+ if(startRow.length == 0) { // the first split, start row is 0
+ totalRows = BytesUtil.bytesToInt(endRow);
+ } else {
+ totalRows = BytesUtil.bytesToInt(endRow) - BytesUtil.bytesToInt(startRow);
+ }
+ }
+ processedRows = 0;
+ LOG.info("Split (" + Bytes.toString(startRow) + ", " + Bytes.toString(endRow) +
+ ") -> " + totalRows);
+ }
+
+
+ /**
+ * @return IntWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createKey()
+ */
+ public IntWritable createKey() {
+ return new IntWritable();
+ }
+
+ /**
+ * @return MapWritable
+ *
+ * @see org.apache.hadoop.mapred.RecordReader#createValue()
+ */
+ public MapWritable createValue() {
+ return new MapWritable();
+ }
+
+ /**
+ * @param key IntWritable as input key.
+ * @param value MapWritable as input value
+ *
+ * Converts Scanner.next() to IntWritable, MapWritable
+ *
+ * @return true if there was more data
+ * @throws IOException
+ */
+ public boolean next(IntWritable key, MapWritable value)
+ throws IOException {
+ RowResult result;
+ try {
+ result = this.scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(lastRow);
+ this.scanner.next(); // skip presumed already mapped row
+ result = this.scanner.next();
+ }
+
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ byte[] row = result.getRow();
+ key.set(BytesUtil.bytesToInt(row));
+ lastRow = row;
+ Writables.copyWritable(new DenseVector(result).getEntries(), value);
+ processedRows++;
+ }
+ return hasMore;
+ }
+
+ @Override
+ public float getProgress() {
+ if(totalRows <= 0) {
+ return 0;
+ } else {
+ return Math.min(1.0f, processedRows / (float)totalRows);
+ }
+ }
+ }
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+ * default.
+ *
+ * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+ * JobConf, Reporter)
+ */
+ public RecordReader<IntWritable, MapWritable> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ TableSplit tSplit = (TableSplit) split;
+ TableRecordReader trr = this.tableRecordReader;
+ // if no table record reader was provided use default
+ if (trr == null) {
+ trr = new TableRecordReader();
+ }
+ trr.setStartRow(tSplit.getStartRow());
+ trr.setEndRow(tSplit.getEndRow());
+ trr.setHTable(this.table);
+ trr.setInputColumns(this.inputColumns);
+ trr.setRowFilter(this.rowFilter);
+ trr.init();
+ return trr;
+ }
+
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader to provide other {@link TableRecordReader}
+ * implementations.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Feb 16 04:35:23 2009
@@ -1,341 +1,342 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.util.RandomVariable;
-import org.apache.log4j.Logger;
-
-/**
- * Matrix test
- */
-public class TestDenseMatrix extends TestCase {
- static final Logger LOG = Logger.getLogger(TestDenseMatrix.class);
- private static int SIZE = 10;
- private static Matrix m1;
- private static Matrix m2;
- private final static String aliase1 = "matrix_aliase_A";
- private final static String aliase2 = "matrix_aliase_B";
- private static HamaConfiguration conf;
- private static HBaseAdmin admin;
- private static HamaAdmin hamaAdmin;
-
- public static Test suite() {
- TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
- protected void setUp() throws Exception {
- HCluster hCluster = new HCluster();
- hCluster.setUp();
-
- conf = hCluster.getConf();
- admin = new HBaseAdmin(conf);
- hamaAdmin = new HamaAdminImpl(conf, admin);
-
- m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
- m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
- }
-
- protected void tearDown() {
- try {
- closeTest();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- };
- return setup;
- }
-
- public static void closeTest() throws IOException {
- m1.close();
- m2.close();
- }
-
- public void testEntryAdd() throws IOException {
- double origin = m1.get(1, 1);
- m1.add(1, 1, 0.5);
-
- assertEquals(m1.get(1, 1), origin + 0.5);
- }
-
- /**
- * Column vector test.
- *
- * @param rand
- * @throws IOException
- */
- public void testGetColumn() throws IOException {
- Vector v = m1.getColumn(0);
- Iterator<DoubleEntry> it = v.iterator();
- int x = 0;
- while (it.hasNext()) {
- assertEquals(m1.get(x, 0), it.next().getValue());
- x++;
- }
- }
-
- public void testGetSetAttribute() throws IOException {
- m1.setRowLabel(0, "row1");
- assertEquals(m1.getRowLabel(0), "row1");
- assertEquals(m1.getRowLabel(1), null);
-
- m1.setColumnLabel(0, "column1");
- assertEquals(m1.getColumnLabel(0), "column1");
- assertEquals(m1.getColumnLabel(1), null);
- }
-
- public void testSubMatrix() throws IOException {
- SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4
- for (int i = 0; i < a.getRows(); i++) {
- for (int j = 0; j < a.getColumns(); j++) {
- assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
- }
- }
-
- SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3
- SubMatrix c = a.mult(b);
-
- double[][] C = new double[3][3]; // A * B
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 3; j++) {
- for (int k = 0; k < 4; k++) {
- C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j);
- }
- }
- }
-
- for (int i = 0; i < 3; i++) {
- for (int j = 0; j < 3; j++) {
- assertEquals(C[i][j], c.get(i, j));
- }
- }
- }
-
- /**
- * Test matrices addition
- *
- * @throws IOException
- */
- public void testMatrixAdd() throws IOException {
- Matrix result = m1.add(m2);
-
- assertEquals(result.getRows(), SIZE);
- assertEquals(result.getColumns(), SIZE);
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j));
- }
- }
- }
-
- /**
- * Test matrices multiplication
- *
- * @throws IOException
- */
- public void testMatrixMult() throws IOException {
- Matrix result = m1.mult(m2);
-
- assertEquals(result.getRows(), SIZE);
- assertEquals(result.getColumns(), SIZE);
-
- verifyMultResult(m1, m2, result);
- }
-
- public void testSetMatrix() throws IOException {
- Matrix a = new DenseMatrix(conf);
- a.set(m1);
-
- for (int i = 0; i < 5; i++) {
- int x = RandomVariable.randInt(0, 10);
- int y = RandomVariable.randInt(0, 10);
- assertEquals(a.get(x, y), m1.get(x, y));
- }
- }
-
- public void testSetAlphaMatrix() throws IOException {
- Matrix a = new DenseMatrix(conf);
- a.set(0.5, m1);
-
- for (int i = 0; i < 5; i++) {
- int x = RandomVariable.randInt(0, 10);
- int y = RandomVariable.randInt(0, 10);
- assertEquals(a.get(x, y), (m1.get(x, y) * 0.5));
- }
- }
-
- public void testAddAlphaMatrix() throws IOException {
- double value = m1.get(0, 0) + (m2.get(0, 0) * 0.1);
- Matrix result = m1.add(0.1, m2);
- assertEquals(value, result.get(0, 0));
- }
-
- public void testSetRow() throws IOException {
- Vector v = new DenseVector();
- double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-
- for (int i = 0; i < SIZE; i++) {
- v.set(i, entries[i]);
- }
-
- m1.setRow(SIZE + 1, v);
- Iterator<DoubleEntry> it = m1.getRow(SIZE + 1).iterator();
-
- int i = 0;
- while (it.hasNext()) {
- assertEquals(entries[i], it.next().getValue());
- i++;
- }
- }
-
- public void testSetColumn() throws IOException {
- Vector v = new DenseVector();
- double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
-
- for (int i = 0; i < SIZE; i++) {
- v.set(i, entries[i]);
- }
-
- m1.setColumn(SIZE + 1, v);
- Iterator<DoubleEntry> it = m1.getColumn(SIZE + 1).iterator();
-
- int i = 0;
- while (it.hasNext()) {
- assertEquals(entries[i], it.next().getValue());
- i++;
- }
- }
-
- public void testLoadSave() throws IOException {
- String path1 = m1.getPath();
- // save m1 to aliase1
- m1.save(aliase1);
- // load matrix m1 using aliase1
- DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
-
- for (int i = 0; i < loadTest.getRows(); i++) {
- for (int j = 0; j < loadTest.getColumns(); j++) {
- assertEquals(m1.get(i, j), loadTest.get(i, j));
- }
- }
-
- assertEquals(path1, loadTest.getPath());
- // close loadTest, it just disconnect to the table but didn't delete it.
- loadTest.close();
-
- // try to close m1 & load matrix m1 using aliase1 again.
- m1.close();
- DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
- assertEquals(path1, loadTest2.getPath());
- // remove aliase1
- // because loadTest2 connect the aliase1, so we just remove aliase entry
- // but didn't delete the table.
- hamaAdmin.delete(aliase1);
- assertEquals(true, admin.tableExists(path1));
- // close loadTest2, because it is the last one who reference table 'path1'
- // it will do the gc!
- loadTest2.close();
- assertEquals(false, admin.tableExists(path1));
-
- // if we try to load non-existed matrix using aliase name, it should fail.
- DenseMatrix loadTest3 = null;
- try {
- loadTest3 = new DenseMatrix(conf, aliase1, false);
- fail("Try to load a non-existed matrix should fail!");
- } catch (IOException e) {
-
- } finally {
- if (loadTest3 != null)
- loadTest3.close();
- }
- }
-
- public void testForceCreate() throws IOException {
- String path2 = m2.getPath();
- // save m2 to aliase2
- m2.save(aliase2);
- // load matrix m2 using aliase2
- DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
-
- for (int i = 0; i < loadTest.getRows(); i++) {
- for (int j = 0; j < loadTest.getColumns(); j++) {
- assertEquals(m2.get(i, j), loadTest.get(i, j));
- }
- }
-
- assertEquals(path2, loadTest.getPath());
-
- // force to create matrix loadTest2 using aliasename 'aliase2'
- DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
- String loadPath2 = loadTest2.getPath();
- assertFalse(path2.equals(loadPath2));
- assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
- assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
-
- // try to close m2 & loadTest, it table will be deleted finally
- m2.close();
- assertEquals(true, admin.tableExists(path2));
- loadTest.close();
- assertEquals(false, admin.tableExists(path2));
-
- // remove 'aliase2' & close loadTest2
- loadTest2.close();
- assertEquals(true, admin.tableExists(loadPath2));
- hamaAdmin.delete(aliase2);
- assertEquals(false, admin.tableExists(loadPath2));
- }
-
- /**
- * Verifying multiplication result
- *
- * @param m1
- * @param m2
- * @param result
- * @throws IOException
- */
- private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
- throws IOException {
- double[][] c = new double[SIZE][SIZE];
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- for (int k = 0; k < SIZE; k++) {
- c[i][k] += m1.get(i, j) * m2.get(j, k);
- }
- }
- }
-
- for (int i = 0; i < SIZE; i++) {
- for (int j = 0; j < SIZE; j++) {
- double gap = (c[i][j] - result.get(i, j));
- assertTrue(gap < 0.000001 || gap < -0.000001);
- }
- }
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.util.RandomVariable;
+import org.apache.log4j.Logger;
+
+/**
+ * Matrix test
+ */
+public class TestDenseMatrix extends TestCase {
+ static final Logger LOG = Logger.getLogger(TestDenseMatrix.class);
+ private static int SIZE = 10;
+ private static Matrix m1;
+ private static Matrix m2;
+ private final static String aliase1 = "matrix_aliase_A";
+ private final static String aliase2 = "matrix_aliase_B";
+ private static HamaConfiguration conf;
+ private static HBaseAdmin admin;
+ private static HamaAdmin hamaAdmin;
+
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestDenseMatrix.class)) {
+ protected void setUp() throws Exception {
+ HCluster hCluster = new HCluster();
+ hCluster.setUp();
+
+ conf = hCluster.getConf();
+ admin = new HBaseAdmin(conf);
+ hamaAdmin = new HamaAdminImpl(conf, admin);
+
+ m1 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
+ m2 = DenseMatrix.random(hCluster.getConf(), SIZE, SIZE);
+ }
+
+ protected void tearDown() {
+ try {
+ closeTest();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ return setup;
+ }
+
+ public static void closeTest() throws IOException {
+ m1.close();
+ m2.close();
+ }
+
+ public void testEntryAdd() throws IOException {
+ double origin = m1.get(1, 1);
+ m1.add(1, 1, 0.5);
+
+ assertEquals(m1.get(1, 1), origin + 0.5);
+ }
+
+ /**
+ * Column vector test.
+ *
+ * @param rand
+ * @throws IOException
+ */
+ public void testGetColumn() throws IOException {
+ Vector v = m1.getColumn(0);
+ Iterator<Writable> it = v.iterator();
+ int x = 0;
+ while (it.hasNext()) {
+ assertEquals(m1.get(x, 0), ((DoubleEntry) it.next()).getValue());
+ x++;
+ }
+ }
+
+ public void testGetSetAttribute() throws IOException {
+ m1.setRowLabel(0, "row1");
+ assertEquals(m1.getRowLabel(0), "row1");
+ assertEquals(m1.getRowLabel(1), null);
+
+ m1.setColumnLabel(0, "column1");
+ assertEquals(m1.getColumnLabel(0), "column1");
+ assertEquals(m1.getColumnLabel(1), null);
+ }
+
+ public void testSubMatrix() throws IOException {
+ SubMatrix a = m1.subMatrix(2, 4, 2, 5); // A : 3 * 4
+ for (int i = 0; i < a.getRows(); i++) {
+ for (int j = 0; j < a.getColumns(); j++) {
+ assertEquals(a.get(i, j), m1.get(i + 2, j + 2));
+ }
+ }
+
+ SubMatrix b = m2.subMatrix(0, 3, 0, 2); // B : 4 * 3
+ SubMatrix c = a.mult(b);
+
+ double[][] C = new double[3][3]; // A * B
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ for (int k = 0; k < 4; k++) {
+ C[i][j] += m1.get(i + 2, k + 2) * m2.get(k, j);
+ }
+ }
+ }
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ assertEquals(C[i][j], c.get(i, j));
+ }
+ }
+ }
+
+ /**
+ * Test matrices addition
+ *
+ * @throws IOException
+ */
+ public void testMatrixAdd() throws IOException {
+ Matrix result = m1.add(m2);
+
+ assertEquals(result.getRows(), SIZE);
+ assertEquals(result.getColumns(), SIZE);
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ assertEquals(result.get(i, j), m1.get(i, j) + m2.get(i, j));
+ }
+ }
+ }
+
+ /**
+ * Test matrices multiplication
+ *
+ * @throws IOException
+ */
+ public void testMatrixMult() throws IOException {
+ Matrix result = m1.mult(m2);
+
+ assertEquals(result.getRows(), SIZE);
+ assertEquals(result.getColumns(), SIZE);
+
+ verifyMultResult(m1, m2, result);
+ }
+
+ public void testSetMatrix() throws IOException {
+ Matrix a = new DenseMatrix(conf);
+ a.set(m1);
+
+ for (int i = 0; i < 5; i++) {
+ int x = RandomVariable.randInt(0, 10);
+ int y = RandomVariable.randInt(0, 10);
+ assertEquals(a.get(x, y), m1.get(x, y));
+ }
+ }
+
+ public void testSetAlphaMatrix() throws IOException {
+ Matrix a = new DenseMatrix(conf);
+ a.set(0.5, m1);
+
+ for (int i = 0; i < 5; i++) {
+ int x = RandomVariable.randInt(0, 10);
+ int y = RandomVariable.randInt(0, 10);
+ assertEquals(a.get(x, y), (m1.get(x, y) * 0.5));
+ }
+ }
+
+ public void testAddAlphaMatrix() throws IOException {
+ double value = m1.get(0, 0) + (m2.get(0, 0) * 0.1);
+ Matrix result = m1.add(0.1, m2);
+ assertEquals(value, result.get(0, 0));
+ }
+
+ public void testSetRow() throws IOException {
+ Vector v = new DenseVector();
+ double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+ for (int i = 0; i < SIZE; i++) {
+ v.set(i, entries[i]);
+ }
+
+ m1.setRow(SIZE + 1, v);
+ Iterator<Writable> it = m1.getRow(SIZE + 1).iterator();
+
+ int i = 0;
+ while (it.hasNext()) {
+ assertEquals(entries[i], ((DoubleEntry) it.next()).getValue());
+ i++;
+ }
+ }
+
+ public void testSetColumn() throws IOException {
+ Vector v = new DenseVector();
+ double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+ for (int i = 0; i < SIZE; i++) {
+ v.set(i, entries[i]);
+ }
+
+ m1.setColumn(SIZE + 1, v);
+ Iterator<Writable> it = m1.getColumn(SIZE + 1).iterator();
+
+ int i = 0;
+ while (it.hasNext()) {
+ assertEquals(entries[i], ((DoubleEntry) it.next()).getValue());
+ i++;
+ }
+ }
+
+ public void testLoadSave() throws IOException {
+ String path1 = m1.getPath();
+ // save m1 to aliase1
+ m1.save(aliase1);
+ // load matrix m1 using aliase1
+ DenseMatrix loadTest = new DenseMatrix(conf, aliase1, false);
+
+ for (int i = 0; i < loadTest.getRows(); i++) {
+ for (int j = 0; j < loadTest.getColumns(); j++) {
+ assertEquals(m1.get(i, j), loadTest.get(i, j));
+ }
+ }
+
+ assertEquals(path1, loadTest.getPath());
+ // close loadTest, it just disconnect to the table but didn't delete it.
+ loadTest.close();
+
+ // try to close m1 & load matrix m1 using aliase1 again.
+ m1.close();
+ DenseMatrix loadTest2 = new DenseMatrix(conf, aliase1, false);
+ assertEquals(path1, loadTest2.getPath());
+ // remove aliase1
+ // because loadTest2 connect the aliase1, so we just remove aliase entry
+ // but didn't delete the table.
+ hamaAdmin.delete(aliase1);
+ assertEquals(true, admin.tableExists(path1));
+ // close loadTest2, because it is the last one who reference table 'path1'
+ // it will do the gc!
+ loadTest2.close();
+ assertEquals(false, admin.tableExists(path1));
+
+ // if we try to load non-existed matrix using aliase name, it should fail.
+ DenseMatrix loadTest3 = null;
+ try {
+ loadTest3 = new DenseMatrix(conf, aliase1, false);
+ fail("Try to load a non-existed matrix should fail!");
+ } catch (IOException e) {
+
+ } finally {
+ if (loadTest3 != null)
+ loadTest3.close();
+ }
+ }
+
+ public void testForceCreate() throws IOException {
+ String path2 = m2.getPath();
+ // save m2 to aliase2
+ m2.save(aliase2);
+ // load matrix m2 using aliase2
+ DenseMatrix loadTest = new DenseMatrix(conf, aliase2, false);
+
+ for (int i = 0; i < loadTest.getRows(); i++) {
+ for (int j = 0; j < loadTest.getColumns(); j++) {
+ assertEquals(m2.get(i, j), loadTest.get(i, j));
+ }
+ }
+
+ assertEquals(path2, loadTest.getPath());
+
+ // force to create matrix loadTest2 using aliasename 'aliase2'
+ DenseMatrix loadTest2 = new DenseMatrix(conf, aliase2, true);
+ String loadPath2 = loadTest2.getPath();
+ assertFalse(path2.equals(loadPath2));
+ assertEquals(loadPath2, hamaAdmin.getPath(aliase2));
+ assertFalse(path2.equals(hamaAdmin.getPath(aliase2)));
+
+ // try to close m2 & loadTest, it table will be deleted finally
+ m2.close();
+ assertEquals(true, admin.tableExists(path2));
+ loadTest.close();
+ assertEquals(false, admin.tableExists(path2));
+
+ // remove 'aliase2' & close loadTest2
+ loadTest2.close();
+ assertEquals(true, admin.tableExists(loadPath2));
+ hamaAdmin.delete(aliase2);
+ assertEquals(false, admin.tableExists(loadPath2));
+ }
+
+ /**
+ * Verifying multiplication result
+ *
+ * @param m1
+ * @param m2
+ * @param result
+ * @throws IOException
+ */
+ private void verifyMultResult(Matrix m1, Matrix m2, Matrix result)
+ throws IOException {
+ double[][] c = new double[SIZE][SIZE];
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ for (int k = 0; k < SIZE; k++) {
+ c[i][k] += m1.get(i, j) * m2.get(j, k);
+ }
+ }
+ }
+
+ for (int i = 0; i < SIZE; i++) {
+ for (int j = 0; j < SIZE; j++) {
+ double gap = (c[i][j] - result.get(i, j));
+ assertTrue(gap < 0.000001 || gap < -0.000001);
+ }
+ }
+ }
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseVector.java Mon Feb 16 04:35:23 2009
@@ -1,158 +1,159 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hama.io.DoubleEntry;
-
-public class TestDenseVector extends TestCase {
- final static Log LOG = LogFactory.getLog(TestDenseVector.class.getName());
-
- private static final double cosine = 0.6978227007909176;
- private static final double norm1 = 12.0;
- private static final double norm2 = 6.782329983125268;
- private static double[][] values = { { 2, 5, 1, 4 }, { 4, 1, 3, 3 } };
- private static Matrix m1;
- private static Vector v1;
- private static Vector v2;
-
- public static Test suite() {
- TestSetup setup = new TestSetup(new TestSuite(TestDenseVector.class)) {
- protected void setUp() throws Exception {
- HCluster hCluster = new HCluster();
- hCluster.setUp();
-
- m1 = new DenseMatrix(hCluster.getConf());
-
- for (int i = 0; i < 2; i++)
- for (int j = 0; j < 4; j++)
- m1.set(i, j, values[i][j]);
-
- v1 = m1.getRow(0);
- v2 = m1.getRow(1);
- }
-
- protected void tearDown() {
- LOG.info("tearDown()");
- }
- };
- return setup;
- }
-
- /**
- * Test |a| dot |b|
- */
- public void testDot() {
- double cos = v1.dot(v2);
- assertEquals(cos, cosine);
- }
-
- public void testSubVector() {
- int start = 2;
- Vector subVector = v1.subVector(start, v1.size() - 1);
- Iterator<DoubleEntry> it = subVector.iterator();
-
- int i = start;
- while (it.hasNext()) {
- assertEquals(v1.get(i), it.next().getValue());
- i++;
- }
- }
-
- /**
- * Test norm one
- */
- public void testNom1() {
- double result = ((DenseVector) v1).getNorm1();
- assertEquals(norm1, result);
- }
-
- /**
- * Test norm two
- */
- public void testNom2() {
- double result = ((DenseVector) v1).getNorm2();
- assertEquals(norm2, result);
- }
-
- /**
- * Test scaling
- */
- public void scalingTest() {
- v2.scale(0.5);
-
- for (int i = 0; i < v2.size(); i++) {
- assertEquals(values[1][i] * 0.5, v2.get(i));
- }
- }
-
- /**
- * Test get/set methods
- * @throws IOException
- */
- public void testGetSet() throws IOException {
- assertEquals(v1.get(0), values[0][0]);
- assertEquals(m1.getColumn(0).size(), 2);
- }
-
- /**
- * Test add()
- */
- public void testAdd() {
- v1.add(v2);
- int i = 0;
- Iterator<DoubleEntry> it = v1.iterator();
- while (it.hasNext()) {
- DoubleEntry c = it.next();
- assertEquals(c.getValue(), values[0][i] + values[1][i]);
- i++;
- }
-
- v1.add(0.5, v2);
- int j = 0;
- Iterator<DoubleEntry> itt = v1.iterator();
- while (itt.hasNext()) {
- DoubleEntry c = itt.next();
- assertEquals(c.getValue(), (values[0][j] + values[1][j]) + (0.5 * values[1][j]));
- j++;
- }
-
- double old = v1.get(0);
- v1.add(0, norm1);
- assertEquals(v1.get(0), old + norm1);
- }
-
- /**
- * Clear test
- */
- public void testClear() {
- ((DenseVector) v1).clear();
- assertEquals(v1.size(), 0);
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.io.DoubleEntry;
+
+public class TestDenseVector extends TestCase {
+ final static Log LOG = LogFactory.getLog(TestDenseVector.class.getName());
+
+ private static final double cosine = 0.6978227007909176;
+ private static final double norm1 = 12.0;
+ private static final double norm2 = 6.782329983125268;
+ private static double[][] values = { { 2, 5, 1, 4 }, { 4, 1, 3, 3 } };
+ private static Matrix m1;
+ private static Vector v1;
+ private static Vector v2;
+
+ public static Test suite() {
+ TestSetup setup = new TestSetup(new TestSuite(TestDenseVector.class)) {
+ protected void setUp() throws Exception {
+ HCluster hCluster = new HCluster();
+ hCluster.setUp();
+
+ m1 = new DenseMatrix(hCluster.getConf());
+
+ for (int i = 0; i < 2; i++)
+ for (int j = 0; j < 4; j++)
+ m1.set(i, j, values[i][j]);
+
+ v1 = m1.getRow(0);
+ v2 = m1.getRow(1);
+ }
+
+ protected void tearDown() {
+ LOG.info("tearDown()");
+ }
+ };
+ return setup;
+ }
+
+ /**
+ * Test |a| dot |b|
+ */
+ public void testDot() {
+ double cos = v1.dot(v2);
+ assertEquals(cos, cosine);
+ }
+
+ public void testSubVector() {
+ int start = 2;
+ Vector subVector = v1.subVector(start, v1.size() - 1);
+ Iterator<Writable> it = subVector.iterator();
+
+ int i = start;
+ while (it.hasNext()) {
+ assertEquals(v1.get(i), ((DoubleEntry) it.next()).getValue());
+ i++;
+ }
+ }
+
+ /**
+ * Test norm one
+ */
+ public void testNom1() {
+ double result = ((DenseVector) v1).getNorm1();
+ assertEquals(norm1, result);
+ }
+
+ /**
+ * Test norm two
+ */
+ public void testNom2() {
+ double result = ((DenseVector) v1).getNorm2();
+ assertEquals(norm2, result);
+ }
+
+ /**
+ * Test scaling
+ */
+ public void scalingTest() {
+ v2.scale(0.5);
+
+ for (int i = 0; i < v2.size(); i++) {
+ assertEquals(values[1][i] * 0.5, v2.get(i));
+ }
+ }
+
+ /**
+ * Test get/set methods
+ * @throws IOException
+ */
+ public void testGetSet() throws IOException {
+ assertEquals(v1.get(0), values[0][0]);
+ assertEquals(m1.getColumn(0).size(), 2);
+ }
+
+ /**
+ * Test add()
+ */
+ public void testAdd() {
+ v1.add(v2);
+ int i = 0;
+ Iterator<Writable> it = v1.iterator();
+ while (it.hasNext()) {
+ DoubleEntry c = (DoubleEntry) it.next();
+ assertEquals(c.getValue(), values[0][i] + values[1][i]);
+ i++;
+ }
+
+ v1.add(0.5, v2);
+ int j = 0;
+ Iterator<Writable> itt = v1.iterator();
+ while (itt.hasNext()) {
+ DoubleEntry c = (DoubleEntry) itt.next();
+ assertEquals(c.getValue(), (values[0][j] + values[1][j]) + (0.5 * values[1][j]));
+ j++;
+ }
+
+ double old = v1.get(0);
+ v1.add(0, norm1);
+ assertEquals(v1.get(0), old + norm1);
+ }
+
+ /**
+ * Clear test
+ */
+ public void testClear() {
+ ((DenseVector) v1).clear();
+ assertEquals(v1.size(), 0);
+ }
+}
Modified: incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/examples/TestFileMatrixBlockMult.java Mon Feb 16 04:35:23 2009
@@ -51,7 +51,6 @@
import org.apache.hama.algebra.BlockMultiplyReduce;
import org.apache.hama.io.BlockID;
import org.apache.hama.io.BlockWritable;
-import org.apache.hama.io.VectorWritable;
import org.apache.hama.mapred.CollectBlocksMap;
import org.apache.hama.mapred.CollectBlocksMapReduceBase;
import org.apache.hama.util.JobManager;
@@ -175,33 +174,31 @@
@Override
public void map(IntWritable key, MapWritable value,
- OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
+ OutputCollector<BlockID, MapWritable> output, Reporter reporter)
throws IOException {
- int startColumn;
- int endColumn;
- int blkRow = key.get() / mBlockRowSize;
+ int startColumn, endColumn, blkRow = key.get() / mBlockRowSize, i = 0;
this.value = value;
-
- int i = 0;
+
do {
startColumn = i * mBlockColSize;
endColumn = startColumn + mBlockColSize - 1;
if (endColumn >= mColumns) // the last sub vector
endColumn = mColumns - 1;
- output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
- subVector(startColumn, endColumn)));
+ output.collect(new BlockID(blkRow, i), subVector(key.get(), startColumn, endColumn));
i++;
} while (endColumn < (mColumns - 1));
}
- private DenseVector subVector(int i0, int i1) {
+ private MapWritable subVector(int row, int i0, int i1) {
DenseVector res = new DenseVector();
+ res.setRow(row);
+
for (int i = i0; i <= i1; i++) {
res.set(i, ((DoubleWritable) this.value.get(new IntWritable(i))).get());
}
- return res;
+ return res.getEntries();
}
}
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=744798&r1=744797&r2=744798&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Mon Feb 16 04:35:23 2009
@@ -1,79 +1,79 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.HCluster;
-import org.apache.hama.Matrix;
-import org.apache.hama.algebra.RowCyclicAdditionMap;
-import org.apache.hama.algebra.RowCyclicAdditionReduce;
-import org.apache.hama.io.VectorWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Test Matrix Map/Reduce job
- */
-public class TestMatrixMapReduce extends HCluster {
- static final Logger LOG = Logger.getLogger(TestMatrixMapReduce.class);
-
- /** constructor */
- public TestMatrixMapReduce() {
- super();
- }
-
- public void testMatrixMapReduce() throws IOException {
- Matrix matrixA = new DenseMatrix(conf);
- matrixA.set(0, 0, 1);
- matrixA.set(0, 1, 0);
- matrixA.setDimension(1, 2);
-
- Matrix matrixB = new DenseMatrix(conf);
- matrixB.set(0, 0, 1);
- matrixB.set(0, 1, 1);
- matrixB.setDimension(1, 2);
-
- miniMRJob(matrixA.getPath(), matrixB.getPath());
- }
-
- private void miniMRJob(String string, String string2) throws IOException {
- Matrix c = new DenseMatrix(conf);
- String output = c.getPath();
-
- JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
- jobConf.setJobName("test MR job");
-
- RowCyclicAdditionMap.initJob(string, string2, RowCyclicAdditionMap.class, IntWritable.class,
- VectorWritable.class, jobConf);
- RowCyclicAdditionReduce.initJob(output, RowCyclicAdditionReduce.class, jobConf);
-
- jobConf.setNumMapTasks(2);
- jobConf.setNumReduceTasks(2);
-
- JobClient.runJob(jobConf);
-
- assertEquals(c.get(0, 0), 2.0);
- assertEquals(c.get(0, 1), 1.0);
- }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HCluster;
+import org.apache.hama.Matrix;
+import org.apache.hama.algebra.RowCyclicAdditionMap;
+import org.apache.hama.algebra.RowCyclicAdditionReduce;
+import org.apache.log4j.Logger;
+
+/**
+ * Test Matrix Map/Reduce job
+ */
+public class TestMatrixMapReduce extends HCluster {
+ static final Logger LOG = Logger.getLogger(TestMatrixMapReduce.class);
+
+ /** constructor */
+ public TestMatrixMapReduce() {
+ super();
+ }
+
+ public void testMatrixMapReduce() throws IOException {
+ Matrix matrixA = new DenseMatrix(conf);
+ matrixA.set(0, 0, 1);
+ matrixA.set(0, 1, 0);
+ matrixA.setDimension(1, 2);
+
+ Matrix matrixB = new DenseMatrix(conf);
+ matrixB.set(0, 0, 1);
+ matrixB.set(0, 1, 1);
+ matrixB.setDimension(1, 2);
+
+ miniMRJob(matrixA.getPath(), matrixB.getPath());
+ }
+
+ private void miniMRJob(String string, String string2) throws IOException {
+ Matrix c = new DenseMatrix(conf);
+ String output = c.getPath();
+
+ JobConf jobConf = new JobConf(conf, TestMatrixMapReduce.class);
+ jobConf.setJobName("test MR job");
+
+ RowCyclicAdditionMap.initJob(string, string2, RowCyclicAdditionMap.class, IntWritable.class,
+ MapWritable.class, jobConf);
+ RowCyclicAdditionReduce.initJob(output, RowCyclicAdditionReduce.class, jobConf);
+
+ jobConf.setNumMapTasks(2);
+ jobConf.setNumReduceTasks(2);
+
+ JobClient.runJob(jobConf);
+
+ assertEquals(c.get(0, 0), 2.0);
+ assertEquals(c.get(0, 1), 1.0);
+ }
+}