You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/08/06 19:32:52 UTC
git commit: updated refs/heads/trunk to fa6e3d5
Updated Branches:
refs/heads/trunk d419f8f4f -> fa6e3d5a8
GIRAPH-728: Efficient matrix aggregators (herald via apresta)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fa6e3d5a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fa6e3d5a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fa6e3d5a
Branch: refs/heads/trunk
Commit: fa6e3d5a8b25616e31cf355f2cb5a7001f4c8f64
Parents: d419f8f
Author: Alessandro Presta <al...@fb.com>
Authored: Tue Aug 6 10:31:59 2013 -0700
Committer: Alessandro Presta <al...@fb.com>
Committed: Tue Aug 6 10:31:59 2013 -0700
----------------------------------------------------------------------
.../giraph/aggregators/matrix/DoubleMatrix.java | 104 ++++++++++++++++
.../matrix/DoubleMatrixSumAggregator.java | 101 +++++++++++++++
.../giraph/aggregators/matrix/DoubleVector.java | 123 +++++++++++++++++++
.../matrix/DoubleVectorSumAggregator.java | 37 ++++++
.../giraph/aggregators/matrix/FloatMatrix.java | 104 ++++++++++++++++
.../matrix/FloatMatrixSumAggregator.java | 100 +++++++++++++++
.../giraph/aggregators/matrix/FloatVector.java | 123 +++++++++++++++++++
.../matrix/FloatVectorSumAggregator.java | 37 ++++++
.../giraph/aggregators/matrix/IntMatrix.java | 104 ++++++++++++++++
.../matrix/IntMatrixSumAggregator.java | 100 +++++++++++++++
.../giraph/aggregators/matrix/IntVector.java | 123 +++++++++++++++++++
.../matrix/IntVectorSumAggregator.java | 37 ++++++
.../giraph/aggregators/matrix/LongMatrix.java | 104 ++++++++++++++++
.../matrix/LongMatrixSumAggregator.java | 100 +++++++++++++++
.../giraph/aggregators/matrix/LongVector.java | 123 +++++++++++++++++++
.../matrix/LongVectorSumAggregator.java | 37 ++++++
.../aggregators/matrix/MatrixSumAggregator.java | 51 ++++++++
.../giraph/aggregators/matrix/package-info.java | 21 ++++
.../aggregators/matrix/TestDoubleMatrix.java | 74 +++++++++++
.../aggregators/matrix/TestFloatMatrix.java | 74 +++++++++++
.../aggregators/matrix/TestIntMatrix.java | 73 +++++++++++
.../aggregators/matrix/TestLongMatrix.java | 73 +++++++++++
22 files changed, 1823 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java
new file mode 100644
index 0000000..d86dc4b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A double matrix holds the values of the entries in double vectors. It keeps
+ * one double aggregator per matrix row.
+ */
+public class DoubleMatrix {
+ /** The number of rows in the matrix */
+ private int numRows;
+ /** The rows of the matrix */
+ private Int2ObjectOpenHashMap<DoubleVector> rows;
+
+ /**
+ * Create a new matrix with the given number of rows.
+ *
+ * @param numRows the number of rows.
+ */
+ public DoubleMatrix(int numRows) {
+ this.numRows = numRows;
+ rows = new Int2ObjectOpenHashMap<DoubleVector>(numRows);
+ rows.defaultReturnValue(null);
+ }
+
+ /**
+ * Create a empty matrix with all values set to 0.0
+ */
+ public void initialize() {
+ rows.clear();
+ for (int i = 0; i < numRows; ++i) {
+ setRow(i, new DoubleVector());
+ }
+ }
+
+ /**
+ * Get the number of rows in the matrix.
+ *
+ * @return the number of rows.
+ */
+ public int getNumRows() {
+ return numRows;
+ }
+
+ /**
+ * Get a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @return the value of the entry
+ */
+ public double get(int i, int j) {
+ return rows.get(i).get(j);
+ }
+
+ /**
+ * Set a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value of the entry
+ */
+ public void set(int i, int j, double v) {
+ rows.get(i).set(j, v);
+ }
+
+ /**
+ * Get a specific row of the matrix.
+ *
+ * @param i the row number
+ * @return the row of the matrix
+ */
+ DoubleVector getRow(int i) {
+ return rows.get(i);
+ }
+
+ /**
+ * Set the double vector as the row specified.
+ *
+ * @param i the row
+ * @param vec the vector to set as the row
+ */
+ void setRow(int i, DoubleVector vec) {
+ rows.put(i, vec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java
new file mode 100644
index 0000000..0a1dafb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleMatrixSumAggregator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The double matrix aggregator is used to register and aggregate double
+ * matrices.
+ */
+public class DoubleMatrixSumAggregator extends MatrixSumAggregator {
+ /** sparse vector with single entry */
+ private DoubleVector singletonVector = new DoubleVector();
+
+ /**
+ * Create a new matrix aggregator with the given prefix name for the vector
+ * aggregators.
+ *
+ * @param name the prefix for the row vector aggregators
+ */
+ public DoubleMatrixSumAggregator(String name) {
+ super(name);
+ }
+
+ /**
+ * Register the double vector aggregators, one for each row of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param master the master to register the aggregators
+ */
+ public void register(int numRows, MasterAggregatorUsage master)
+ throws InstantiationException, IllegalAccessException {
+ for (int i = 0; i < numRows; ++i) {
+ master.registerAggregator(getRowAggregatorName(i),
+ DoubleVectorSumAggregator.class);
+ }
+ }
+
+ /**
+ * Add the given value to the entry specified.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value
+ * @param worker the worker to aggregate
+ */
+ public void aggregate(int i, int j, double v, WorkerAggregatorUsage worker) {
+ singletonVector.clear();
+ singletonVector.set(j, v);
+ worker.aggregate(getRowAggregatorName(i), singletonVector);
+ }
+
+ /**
+ * Set the values of the matrix to the master specified. This is typically
+ * used in the master, to build an external DoubleMatrix and only set it at
+ * the end.
+ *
+ * @param matrix the matrix to set the values
+ * @param master the master
+ */
+ public void setMatrix(DoubleMatrix matrix, MasterAggregatorUsage master) {
+ int numRows = matrix.getNumRows();
+ for (int i = 0; i < numRows; ++i) {
+ master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+ }
+ }
+
+ /**
+ * Read the aggregated values of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param aggUser the master or worker
+ * @return the double matrix
+ */
+ public DoubleMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+ DoubleMatrix matrix = new DoubleMatrix(numRows);
+ for (int i = 0; i < numRows; ++i) {
+ DoubleVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+ matrix.setRow(i, vec);
+ }
+ return matrix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java
new file mode 100644
index 0000000..288be93
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The double vector holds the values of a particular row.
+ */
+public class DoubleVector implements Writable {
+ /**
+ * The entries of the vector are (key, value) pairs of the form (row, value)
+ */
+ private Int2DoubleOpenHashMap entries = null;
+
+ /**
+ * Create a new vector with default size.
+ */
+ public DoubleVector() {
+ initialize(Int2DoubleOpenHashMap.DEFAULT_INITIAL_SIZE);
+ }
+
+ /**
+ * Create a new vector with given size.
+ *
+ * @param size the size of the vector
+ */
+ public DoubleVector(int size) {
+ initialize(size);
+ }
+
+ /**
+ * Initialize the values of the vector. The default value is 0.0
+ *
+ * @param size the size of the vector
+ */
+ private void initialize(int size) {
+ entries = new Int2DoubleOpenHashMap(size);
+ entries.defaultReturnValue(0.0f);
+ }
+
+ /**
+ * Get a particular entry of the vector.
+ *
+ * @param i the entry
+ * @return the value of the entry.
+ */
+ double get(int i) {
+ return entries.get(i);
+ }
+
+ /**
+ * Set the given value to the entry specified.
+ *
+ * @param i the entry
+ * @param value the value to set to the entry
+ */
+ void set(int i, double value) {
+ entries.put(i, value);
+ }
+
+ /**
+ * Clear the contents of the vector.
+ */
+ void clear() {
+ entries.clear();
+ }
+
+ /**
+ * Add the vector specified. This is a vector addition that does an
+ * element-by-element addition.
+ *
+ * @param other the vector to add.
+ */
+ void add(DoubleVector other) {
+ for (Entry<Integer, Double> entry : other.entries.entrySet()) {
+ entries.addTo(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(entries.size());
+ for (Entry<Integer, Double> entry : entries.entrySet()) {
+ out.writeInt(entry.getKey());
+ out.writeDouble(entry.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ initialize(size);
+ for (int i = 0; i < size; ++i) {
+ int row = in.readInt();
+ double value = in.readDouble();
+ entries.put(row, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java
new file mode 100644
index 0000000..3318554
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/DoubleVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The double vector aggregator is used to aggregate double vectors.
+ */
+public class DoubleVectorSumAggregator extends BasicAggregator<DoubleVector> {
+
+ @Override
+ public DoubleVector createInitialValue() {
+ return new DoubleVector();
+ }
+
+ @Override
+ public void aggregate(DoubleVector vector) {
+ getAggregatedValue().add(vector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java
new file mode 100644
index 0000000..67bad5c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A float matrix holds the values of the entries in float vectors. It keeps one
+ * float aggregator per matrix row.
+ */
+public class FloatMatrix {
+ /** The number of rows in the matrix */
+ private int numRows;
+ /** The rows of the matrix */
+ private Int2ObjectOpenHashMap<FloatVector> rows;
+
+ /**
+ * Create a new matrix with the given number of rows.
+ *
+ * @param numRows the number of rows.
+ */
+ public FloatMatrix(int numRows) {
+ this.numRows = numRows;
+ rows = new Int2ObjectOpenHashMap<FloatVector>(numRows);
+ rows.defaultReturnValue(null);
+ }
+
+ /**
+ * Create a empty matrix with all values set to 0.0
+ */
+ public void initialize() {
+ rows.clear();
+ for (int i = 0; i < numRows; ++i) {
+ setRow(i, new FloatVector());
+ }
+ }
+
+ /**
+ * Get the number of rows in the matrix.
+ *
+ * @return the number of rows.
+ */
+ public int getNumRows() {
+ return numRows;
+ }
+
+ /**
+ * Get a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @return the value of the entry
+ */
+ public float get(int i, int j) {
+ return rows.get(i).get(j);
+ }
+
+ /**
+ * Set a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value of the entry
+ */
+ public void set(int i, int j, float v) {
+ rows.get(i).set(j, v);
+ }
+
+ /**
+ * Get a specific row of the matrix.
+ *
+ * @param i the row number
+ * @return the row of the matrix
+ */
+ FloatVector getRow(int i) {
+ return rows.get(i);
+ }
+
+ /**
+ * Set the float vector as the row specified.
+ *
+ * @param i the row
+ * @param vec the vector to set as the row
+ */
+ void setRow(int i, FloatVector vec) {
+ rows.put(i, vec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java
new file mode 100644
index 0000000..54406ed
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatMatrixSumAggregator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The float matrix aggregator is used to register and aggregate float matrices.
+ */
+public class FloatMatrixSumAggregator extends MatrixSumAggregator {
+ /** sparse vector with single entry */
+ private FloatVector singletonVector = new FloatVector();
+
+ /**
+ * Create a new matrix aggregator with the given prefix name for the vector
+ * aggregators.
+ *
+ * @param name the prefix for the row vector aggregators
+ */
+ public FloatMatrixSumAggregator(String name) {
+ super(name);
+ }
+
+ /**
+ * Register the float vector aggregators, one for each row of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param master the master to register the aggregators
+ */
+ public void register(int numRows, MasterAggregatorUsage master)
+ throws InstantiationException, IllegalAccessException {
+ for (int i = 0; i < numRows; ++i) {
+ master.registerAggregator(getRowAggregatorName(i),
+ FloatVectorSumAggregator.class);
+ }
+ }
+
+ /**
+ * Add the given value to the entry specified.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value
+ * @param worker the worker to aggregate
+ */
+ public void aggregate(int i, int j, float v, WorkerAggregatorUsage worker) {
+ singletonVector.clear();
+ singletonVector.set(j, v);
+ worker.aggregate(getRowAggregatorName(i), singletonVector);
+ }
+
+ /**
+ * Set the values of the matrix to the master specified. This is typically
+ * used in the master, to build an external FloatMatrix and only set it at
+ * the end.
+ *
+ * @param matrix the matrix to set the values
+ * @param master the master
+ */
+ public void setMatrix(FloatMatrix matrix, MasterAggregatorUsage master) {
+ int numRows = matrix.getNumRows();
+ for (int i = 0; i < numRows; ++i) {
+ master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+ }
+ }
+
+ /**
+ * Read the aggregated values of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param aggUser the master or worker
+ * @return the float matrix
+ */
+ public FloatMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+ FloatMatrix matrix = new FloatMatrix(numRows);
+ for (int i = 0; i < numRows; ++i) {
+ FloatVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+ matrix.setRow(i, vec);
+ }
+ return matrix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java
new file mode 100644
index 0000000..6efe81e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The float vector holds the values of a particular row.
+ */
+public class FloatVector implements Writable {
+ /**
+ * The entries of the vector are (key, value) pairs of the form (row, value)
+ */
+ private Int2FloatOpenHashMap entries = null;
+
+ /**
+ * Create a new vector with default size.
+ */
+ public FloatVector() {
+ initialize(Int2FloatOpenHashMap.DEFAULT_INITIAL_SIZE);
+ }
+
+ /**
+ * Create a new vector with given size.
+ *
+ * @param size the size of the vector
+ */
+ public FloatVector(int size) {
+ initialize(size);
+ }
+
+ /**
+ * Initialize the values of the vector. The default value is 0.0
+ *
+ * @param size the size of the vector
+ */
+ private void initialize(int size) {
+ entries = new Int2FloatOpenHashMap(size);
+ entries.defaultReturnValue(0.0f);
+ }
+
+ /**
+ * Get a particular entry of the vector.
+ *
+ * @param i the entry
+ * @return the value of the entry.
+ */
+ float get(int i) {
+ return entries.get(i);
+ }
+
+ /**
+ * Set the given value to the entry specified.
+ *
+ * @param i the entry
+ * @param value the value to set to the entry
+ */
+ void set(int i, float value) {
+ entries.put(i, value);
+ }
+
+ /**
+ * Clear the contents of the vector.
+ */
+ void clear() {
+ entries.clear();
+ }
+
+ /**
+ * Add the vector specified. This is a vector addition that does an
+ * element-by-element addition.
+ *
+ * @param other the vector to add.
+ */
+ void add(FloatVector other) {
+ for (Entry<Integer, Float> entry : other.entries.entrySet()) {
+ entries.addTo(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(entries.size());
+ for (Entry<Integer, Float> entry : entries.entrySet()) {
+ out.writeInt(entry.getKey());
+ out.writeFloat(entry.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ initialize(size);
+ for (int i = 0; i < size; ++i) {
+ int row = in.readInt();
+ float value = in.readFloat();
+ entries.put(row, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java
new file mode 100644
index 0000000..b152395
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/FloatVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The float vector aggregator is used to aggregate float vectors.
+ */
+public class FloatVectorSumAggregator extends BasicAggregator<FloatVector> {
+
+ @Override
+ public FloatVector createInitialValue() {
+ return new FloatVector();
+ }
+
+ @Override
+ public void aggregate(FloatVector vector) {
+ getAggregatedValue().add(vector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java
new file mode 100644
index 0000000..624c793
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A int matrix holds the values of the entries in int vectors. It keeps one
+ * int aggregator per matrix row.
+ */
+public class IntMatrix {
+ /** The number of rows in the matrix */
+ private int numRows;
+ /** The rows of the matrix */
+ private Int2ObjectOpenHashMap<IntVector> rows;
+
+ /**
+ * Create a new matrix with the given number of rows.
+ *
+ * @param numRows the number of rows.
+ */
+ public IntMatrix(int numRows) {
+ this.numRows = numRows;
+ rows = new Int2ObjectOpenHashMap<IntVector>(numRows);
+ rows.defaultReturnValue(null);
+ }
+
+ /**
+ * Create a empty matrix with all values set to 0.0
+ */
+ public void initialize() {
+ rows.clear();
+ for (int i = 0; i < numRows; ++i) {
+ setRow(i, new IntVector());
+ }
+ }
+
+ /**
+ * Get the number of rows in the matrix.
+ *
+ * @return the number of rows.
+ */
+ public int getNumRows() {
+ return numRows;
+ }
+
+ /**
+ * Get a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @return the value of the entry
+ */
+ public int get(int i, int j) {
+ return rows.get(i).get(j);
+ }
+
+ /**
+ * Set a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value of the entry
+ */
+ public void set(int i, int j, int v) {
+ rows.get(i).set(j, v);
+ }
+
+ /**
+ * Get a specific row of the matrix.
+ *
+ * @param i the row number
+ * @return the row of the matrix
+ */
+ IntVector getRow(int i) {
+ return rows.get(i);
+ }
+
+ /**
+ * Set the int vector as the row specified.
+ *
+ * @param i the row
+ * @param vec the vector to set as the row
+ */
+ void setRow(int i, IntVector vec) {
+ rows.put(i, vec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java
new file mode 100644
index 0000000..b7afa60
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntMatrixSumAggregator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The int matrix aggregator is used to register and aggregate int matrices.
+ */
+public class IntMatrixSumAggregator extends MatrixSumAggregator {
+ /** sparse vector with single entry */
+ private IntVector singletonVector = new IntVector();
+
+ /**
+ * Create a new matrix aggregator with the given prefix name for the vector
+ * aggregators.
+ *
+ * @param name the prefix for the row vector aggregators
+ */
+ public IntMatrixSumAggregator(String name) {
+ super(name);
+ }
+
+ /**
+ * Register the int vector aggregators, one for each row of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param master the master to register the aggregators
+ */
+ public void register(int numRows, MasterAggregatorUsage master)
+ throws InstantiationException, IllegalAccessException {
+ for (int i = 0; i < numRows; ++i) {
+ master.registerAggregator(getRowAggregatorName(i),
+ IntVectorSumAggregator.class);
+ }
+ }
+
+ /**
+ * Add the given value to the entry specified.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value
+ * @param worker the worker to aggregate
+ */
+ public void aggregate(int i, int j, int v, WorkerAggregatorUsage worker) {
+ singletonVector.clear();
+ singletonVector.set(j, v);
+ worker.aggregate(getRowAggregatorName(i), singletonVector);
+ }
+
+ /**
+ * Set the values of the matrix to the master specified. This is typically
+ * used in the master, to build an external IntMatrix and only set it at
+ * the end.
+ *
+ * @param matrix the matrix to set the values
+ * @param master the master
+ */
+ public void setMatrix(IntMatrix matrix, MasterAggregatorUsage master) {
+ int numRows = matrix.getNumRows();
+ for (int i = 0; i < numRows; ++i) {
+ master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+ }
+ }
+
+ /**
+ * Read the aggregated values of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param aggUser the master or worker
+ * @return the int matrix
+ */
+ public IntMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+ IntMatrix matrix = new IntMatrix(numRows);
+ for (int i = 0; i < numRows; ++i) {
+ IntVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+ matrix.setRow(i, vec);
+ }
+ return matrix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java
new file mode 100644
index 0000000..e5bb400
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The int vector holds the values of a particular row.
+ */
+public class IntVector implements Writable {
+ /**
+ * The entries of the vector are (key, value) pairs of the form (row, value)
+ */
+ private Int2IntOpenHashMap entries = null;
+
+ /**
+ * Create a new vector with default size.
+ */
+ public IntVector() {
+ initialize(Int2IntOpenHashMap.DEFAULT_INITIAL_SIZE);
+ }
+
+ /**
+ * Create a new vector with given size.
+ *
+ * @param size the size of the vector
+ */
+ public IntVector(int size) {
+ initialize(size);
+ }
+
+ /**
+ * Initialize the values of the vector. The default value is 0.0
+ *
+ * @param size the size of the vector
+ */
+ private void initialize(int size) {
+ entries = new Int2IntOpenHashMap(size);
+ entries.defaultReturnValue(0);
+ }
+
+ /**
+ * Get a particular entry of the vector.
+ *
+ * @param i the entry
+ * @return the value of the entry.
+ */
+ int get(int i) {
+ return entries.get(i);
+ }
+
+ /**
+ * Set the given value to the entry specified.
+ *
+ * @param i the entry
+ * @param value the value to set to the entry
+ */
+ void set(int i, int value) {
+ entries.put(i, value);
+ }
+
+ /**
+ * Clear the contents of the vector.
+ */
+ void clear() {
+ entries.clear();
+ }
+
+ /**
+ * Add the vector specified. This is a vector addition that does an
+ * element-by-element addition.
+ *
+ * @param other the vector to add.
+ */
+ void add(IntVector other) {
+ for (Entry<Integer, Integer> entry : other.entries.entrySet()) {
+ entries.addTo(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(entries.size());
+ for (Entry<Integer, Integer> entry : entries.entrySet()) {
+ out.writeInt(entry.getKey());
+ out.writeInt(entry.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ initialize(size);
+ for (int i = 0; i < size; ++i) {
+ int row = in.readInt();
+ int value = in.readInt();
+ entries.put(row, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java
new file mode 100644
index 0000000..b588331
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/IntVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The float vector aggregator is used to aggregate float vectors.
+ */
+public class IntVectorSumAggregator extends BasicAggregator<IntVector> {
+
+ @Override
+ public IntVector createInitialValue() {
+ return new IntVector();
+ }
+
+ @Override
+ public void aggregate(IntVector vector) {
+ getAggregatedValue().add(vector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java
new file mode 100644
index 0000000..dbc3ecb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrix.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+/**
+ * A long matrix holds the values of the entries in long vectors. It keeps one
+ * long aggregator per matrix row.
+ */
+public class LongMatrix {
+ /** The number of rows in the matrix */
+ private int numRows;
+ /** The rows of the matrix */
+ private Int2ObjectOpenHashMap<LongVector> rows;
+
+ /**
+ * Create a new matrix with the given number of rows.
+ *
+ * @param numRows the number of rows.
+ */
+ public LongMatrix(int numRows) {
+ this.numRows = numRows;
+ rows = new Int2ObjectOpenHashMap<LongVector>(numRows);
+ rows.defaultReturnValue(null);
+ }
+
+ /**
+ * Create a empty matrix with all values set to 0.0
+ */
+ public void initialize() {
+ rows.clear();
+ for (int i = 0; i < numRows; ++i) {
+ setRow(i, new LongVector());
+ }
+ }
+
+ /**
+ * Get the number of rows in the matrix.
+ *
+ * @return the number of rows.
+ */
+ public int getNumRows() {
+ return numRows;
+ }
+
+ /**
+ * Get a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @return the value of the entry
+ */
+ public long get(int i, int j) {
+ return rows.get(i).get(j);
+ }
+
+ /**
+ * Set a specific entry of the matrix.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value of the entry
+ */
+ public void set(int i, int j, long v) {
+ rows.get(i).set(j, v);
+ }
+
+ /**
+ * Get a specific row of the matrix.
+ *
+ * @param i the row number
+ * @return the row of the matrix
+ */
+ LongVector getRow(int i) {
+ return rows.get(i);
+ }
+
+ /**
+ * Set the long vector as the row specified.
+ *
+ * @param i the row
+ * @param vec the vector to set as the row
+ */
+ void setRow(int i, LongVector vec) {
+ rows.put(i, vec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java
new file mode 100644
index 0000000..a7dc186
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongMatrixSumAggregator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.master.MasterAggregatorUsage;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+
+/**
+ * The long matrix aggregator is used to register and aggregate long matrices.
+ */
+public class LongMatrixSumAggregator extends MatrixSumAggregator {
+ /** sparse vector with single entry */
+ private LongVector singletonVector = new LongVector();
+
+ /**
+ * Create a new matrix aggregator with the given prefix name for the vector
+ * aggregators.
+ *
+ * @param name the prefix for the row vector aggregators
+ */
+ public LongMatrixSumAggregator(String name) {
+ super(name);
+ }
+
+ /**
+ * Register the long vector aggregators, one for each row of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param master the master to register the aggregators
+ */
+ public void register(int numRows, MasterAggregatorUsage master)
+ throws InstantiationException, IllegalAccessException {
+ for (int i = 0; i < numRows; ++i) {
+ master.registerAggregator(getRowAggregatorName(i),
+ LongVectorSumAggregator.class);
+ }
+ }
+
+ /**
+ * Add the given value to the entry specified.
+ *
+ * @param i the row
+ * @param j the column
+ * @param v the value
+ * @param worker the worker to aggregate
+ */
+ public void aggregate(int i, int j, long v, WorkerAggregatorUsage worker) {
+ singletonVector.clear();
+ singletonVector.set(j, v);
+ worker.aggregate(getRowAggregatorName(i), singletonVector);
+ }
+
+ /**
+ * Set the values of the matrix to the master specified. This is typically
+ * used in the master, to build an external LongMatrix and only set it at
+ * the end.
+ *
+ * @param matrix the matrix to set the values
+ * @param master the master
+ */
+ public void setMatrix(LongMatrix matrix, MasterAggregatorUsage master) {
+ int numRows = matrix.getNumRows();
+ for (int i = 0; i < numRows; ++i) {
+ master.setAggregatedValue(getRowAggregatorName(i), matrix.getRow(i));
+ }
+ }
+
+ /**
+ * Read the aggregated values of the matrix.
+ *
+ * @param numRows the number of rows
+ * @param aggUser the master or worker
+ * @return the long matrix
+ */
+ public LongMatrix getMatrix(int numRows, AggregatorUsage aggUser) {
+ LongMatrix matrix = new LongMatrix(numRows);
+ for (int i = 0; i < numRows; ++i) {
+ LongVector vec = aggUser.getAggregatedValue(getRowAggregatorName(i));
+ matrix.setRow(i, vec);
+ }
+ return matrix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java
new file mode 100644
index 0000000..6781b43
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVector.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The long vector holds the values of a particular row.
+ */
+public class LongVector implements Writable {
+ /**
+ * The entries of the vector are (key, value) pairs of the form (row, value)
+ */
+ private Int2LongOpenHashMap entries = null;
+
+ /**
+ * Create a new vector with default size.
+ */
+ public LongVector() {
+ initialize(Int2LongOpenHashMap.DEFAULT_INITIAL_SIZE);
+ }
+
+ /**
+ * Create a new vector with given size.
+ *
+ * @param size the size of the vector
+ */
+ public LongVector(int size) {
+ initialize(size);
+ }
+
+ /**
+ * Initialize the values of the vector. The default value is 0.0
+ *
+ * @param size the size of the vector
+ */
+ private void initialize(int size) {
+ entries = new Int2LongOpenHashMap(size);
+ entries.defaultReturnValue(0L);
+ }
+
+ /**
+ * Get a particular entry of the vector.
+ *
+ * @param i the entry
+ * @return the value of the entry.
+ */
+ long get(int i) {
+ return entries.get(i);
+ }
+
+ /**
+ * Set the given value to the entry specified.
+ *
+ * @param i the entry
+ * @param value the value to set to the entry
+ */
+ void set(int i, long value) {
+ entries.put(i, value);
+ }
+
+ /**
+ * Clear the contents of the vector.
+ */
+ void clear() {
+ entries.clear();
+ }
+
+ /**
+ * Add the vector specified. This is a vector addition that does an
+ * element-by-element addition.
+ *
+ * @param other the vector to add.
+ */
+ void add(LongVector other) {
+ for (Entry<Integer, Long> kv : other.entries.entrySet()) {
+ entries.addTo(kv.getKey(), kv.getValue());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(entries.size());
+ for (Entry<Integer, Long> kv : entries.entrySet()) {
+ out.writeInt(kv.getKey());
+ out.writeLong(kv.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ initialize(size);
+ for (int i = 0; i < size; ++i) {
+ int row = in.readInt();
+ long value = in.readLong();
+ entries.put(row, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java
new file mode 100644
index 0000000..ed35e15
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/LongVectorSumAggregator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import org.apache.giraph.aggregators.BasicAggregator;
+
+/**
+ * The long vector aggregator is used to aggregate long vectors.
+ */
+public class LongVectorSumAggregator extends BasicAggregator<LongVector> {
+
+ @Override
+ public LongVector createInitialValue() {
+ return new LongVector();
+ }
+
+ @Override
+ public void aggregate(LongVector vector) {
+ getAggregatedValue().add(vector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java
new file mode 100644
index 0000000..3864472
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/MatrixSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+/**
+ * The abstract matrix aggregator contains the prefix name of the vector
+ * aggregators the have the values of the rows.
+ */
+public abstract class MatrixSumAggregator {
+ /**
+ * The prefix name of the double vector aggregators. The aggregator names are
+ * created as (name0, name1, ...).
+ */
+ private String name;
+
+ /**
+ * Create a new matrix aggregator with the given prefix name for the vector
+ * aggregators.
+ *
+ * @param name the prefix for the row vector aggregators
+ */
+ public MatrixSumAggregator(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the name of the aggreagator of the row with the index specified.
+ *
+ * @param i the row of the matrix
+ * @return the name of the aggregator
+ */
+ protected String getRowAggregatorName(int i) {
+ return name + i;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java
new file mode 100644
index 0000000..4297db7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of matrix aggregator.
+ */
+package org.apache.giraph.aggregators.matrix;
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java
new file mode 100644
index 0000000..d67eda1
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestDoubleMatrix.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestDoubleMatrix {
+ private static double E = 0.0001f;
+
+ @Test
+ public void testVectorAdd() {
+ // The default value should be 0
+ DoubleVector vec1 = new DoubleVector();
+ assertEquals(0.0, vec1.get(0), E);
+
+ // Basic get/set
+ vec1.set(0, 0.1);
+ vec1.set(10, 1.4);
+ assertEquals(0.1, vec1.get(0), E);
+ assertEquals(0.0, vec1.get(5), E);
+ assertEquals(1.4, vec1.get(10), E);
+
+ // Add another vector
+ DoubleVector vec2 = new DoubleVector();
+ vec2.set(0, 0.5);
+ vec2.set(5, 1.7);
+
+ vec1.add(vec2);
+ assertEquals(0.6, vec1.get(0), E);
+ assertEquals(1.7, vec1.get(5), E);
+ assertEquals(1.4, vec1.get(10), E);
+ assertEquals(0.0, vec1.get(15), E);
+ }
+
+ @Test
+ public void testVectorSerialize() throws Exception {
+ int size = 100;
+
+ // Serialize from
+ DoubleVector from = new DoubleVector(size);
+ from.set(0, 10.0);
+ from.set(10, 5.0);
+ from.set(12, 1.0);
+ byte[] data = WritableUtils.writeToByteArray(from);
+
+ // De-serialize to
+ DoubleVector to = new DoubleVector();
+ WritableUtils.readFieldsFromByteArray(data, to);
+
+ // The vectors should be equal
+ for (int i = 0; i < size; ++i) {
+ assertEquals(from.get(i), to.get(i), E);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java
new file mode 100644
index 0000000..d0f9bb0
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestFloatMatrix.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestFloatMatrix {
+ private static float E = 0.0001f;
+
+ @Test
+ public void testVectorAdd() {
+ // The default value should be 0
+ FloatVector vec1 = new FloatVector();
+ assertEquals(0.0, vec1.get(0), E);
+
+ // Basic get/set
+ vec1.set(0, 0.1f);
+ vec1.set(10, 1.4f);
+ assertEquals(0.1, vec1.get(0), E);
+ assertEquals(0.0, vec1.get(5), E);
+ assertEquals(1.4, vec1.get(10), E);
+
+ // Add another vector
+ FloatVector vec2 = new FloatVector();
+ vec2.set(0, 0.5f);
+ vec2.set(5, 1.7f);
+
+ vec1.add(vec2);
+ assertEquals(0.6, vec1.get(0), E);
+ assertEquals(1.7, vec1.get(5), E);
+ assertEquals(1.4, vec1.get(10), E);
+ assertEquals(0.0, vec1.get(15), E);
+ }
+
+ @Test
+ public void testVectorSerialize() throws Exception {
+ int size = 100;
+
+ // Serialize from
+ FloatVector from = new FloatVector(size);
+ from.set(0, 10.0f);
+ from.set(10, 5.0f);
+ from.set(12, 1.0f);
+ byte[] data = WritableUtils.writeToByteArray(from);
+
+ // De-serialize to
+ FloatVector to = new FloatVector();
+ WritableUtils.readFieldsFromByteArray(data, to);
+
+ // The vectors should be equal
+ for (int i = 0; i < size; ++i) {
+ assertEquals(from.get(i), to.get(i), E);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java
new file mode 100644
index 0000000..e8d3561
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestIntMatrix.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestIntMatrix {
+
+ @Test
+ public void testVectorAdd() {
+ // The default value should be 0
+ IntVector vec1 = new IntVector();
+ assertEquals(0, vec1.get(0));
+
+ // Basic get/set
+ vec1.set(0, 1);
+ vec1.set(10, 14);
+ assertEquals(1, vec1.get(0));
+ assertEquals(0, vec1.get(5));
+ assertEquals(14, vec1.get(10));
+
+ // Add another vector
+ IntVector vec2 = new IntVector();
+ vec2.set(0, 5);
+ vec2.set(5, 17);
+
+ vec1.add(vec2);
+ assertEquals(6, vec1.get(0));
+ assertEquals(17, vec1.get(5));
+ assertEquals(14, vec1.get(10));
+ assertEquals(0, vec1.get(15));
+ }
+
+ @Test
+ public void testVectorSerialize() throws Exception {
+ int size = 100;
+
+ // Serialize from
+ IntVector from = new IntVector(size);
+ from.set(0, 10);
+ from.set(10, 5);
+ from.set(12, 1);
+ byte[] data = WritableUtils.writeToByteArray(from);
+
+ // De-serialize to
+ IntVector to = new IntVector();
+ WritableUtils.readFieldsFromByteArray(data, to);
+
+ // The vectors should be equal
+ for (int i = 0; i < size; ++i) {
+ assertEquals(from.get(i), to.get(i));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fa6e3d5a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java
new file mode 100644
index 0000000..a0a7000
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/matrix/TestLongMatrix.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.aggregators.matrix;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.WritableUtils;
+import org.junit.Test;
+
+public class TestLongMatrix {
+
+ @Test
+ public void testVectorAdd() {
+ // The default value should be 0
+ LongVector vec1 = new LongVector();
+ assertEquals(0, vec1.get(0));
+
+ // Basic get/set
+ vec1.set(0, 1);
+ vec1.set(10, 14);
+ assertEquals(1, vec1.get(0));
+ assertEquals(0, vec1.get(5));
+ assertEquals(14, vec1.get(10));
+
+ // Add another vector
+ LongVector vec2 = new LongVector();
+ vec2.set(0, 5);
+ vec2.set(5, 17);
+
+ vec1.add(vec2);
+ assertEquals(6, vec1.get(0));
+ assertEquals(17, vec1.get(5));
+ assertEquals(14, vec1.get(10));
+ assertEquals(0, vec1.get(15));
+ }
+
+ @Test
+ public void testVectorSerialize() throws Exception {
+ int size = 100;
+
+ // Serialize from
+ LongVector from = new LongVector(size);
+ from.set(0, 10);
+ from.set(10, 5);
+ from.set(12, 1);
+ byte[] data = WritableUtils.writeToByteArray(from);
+
+ // De-serialize to
+ LongVector to = new LongVector();
+ WritableUtils.readFieldsFromByteArray(data, to);
+
+ // The vectors should be equal
+ for (int i = 0; i < size; ++i) {
+ assertEquals(from.get(i), to.get(i));
+ }
+ }
+}