You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/12/22 07:53:45 UTC
svn commit: r1222071 - in /incubator/giraph/trunk: CHANGELOG
src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
Author: aching
Date: Thu Dec 22 06:53:44 2011
New Revision: 1222071
URL: http://svn.apache.org/viewvc?rev=1222071&view=rev
Log:
GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex.
(aching)
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1222071&r1=1222070&r2=1222071&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu Dec 22 06:53:44 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex.
+ (aching)
+
GIRAPH-114: Inconsistent message map handling in
BasicRPCCommunications.LargeMessageFlushExecutor. (ssc via aching)
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1222071&r1=1222070&r2=1222071&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Dec 22 06:53:44 2011
@@ -22,6 +22,9 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
+import org.apache.mahout.math.function.DoubleProcedure;
+import org.apache.mahout.math.function.LongFloatProcedure;
+import org.apache.mahout.math.function.LongProcedure;
import org.apache.mahout.math.list.DoubleArrayList;
import org.apache.mahout.math.map.OpenLongFloatHashMap;
@@ -33,38 +36,45 @@ import java.util.List;
import java.util.Map;
public abstract class LongDoubleFloatDoubleVertex extends
- MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> {
/** Class logger */
- private static final Logger LOG = Logger.getLogger(LongDoubleFloatDoubleVertex.class);
+ private static final Logger LOG =
+ Logger.getLogger(LongDoubleFloatDoubleVertex.class);
private long vertexId;
private double vertexValue;
- private OpenLongFloatHashMap verticesWithEdgeValues = new OpenLongFloatHashMap();
+ private OpenLongFloatHashMap verticesWithEdgeValues =
+ new OpenLongFloatHashMap();
private DoubleArrayList messageList = new DoubleArrayList();
@Override
public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
- Map<LongWritable, FloatWritable> edgesW, List<DoubleWritable> messagesW) {
- if (vertexIdW != null ) {
- vertexId = vertexIdW.get();
- }
- if (vertexValueW != null) {
- vertexValue = vertexValueW.get();
- }
- if (edgesW != null) {
- for(Map.Entry<LongWritable, FloatWritable> entry : edgesW.entrySet()) {
- verticesWithEdgeValues.put(entry.getKey().get(), entry.getValue().get());
- }
- }
- if (messagesW != null) {
- for(DoubleWritable m : messagesW) {
- messageList.add(m.get());
+ Map<LongWritable, FloatWritable> edgesW,
+ List<DoubleWritable> messagesW) {
+ if (vertexIdW != null ) {
+ vertexId = vertexIdW.get();
+ }
+ if (vertexValueW != null) {
+ vertexValue = vertexValueW.get();
+ }
+ if (edgesW != null) {
+ for (Map.Entry<LongWritable, FloatWritable> entry :
+ edgesW.entrySet()) {
+ verticesWithEdgeValues.put(entry.getKey().get(),
+ entry.getValue().get());
+ }
+ }
+ if (messagesW != null) {
+ for(DoubleWritable m : messagesW) {
+ messageList.add(m.get());
+ }
}
- }
}
@Override
- public final boolean addEdge(LongWritable targetId, FloatWritable edgeValue) {
+ public final boolean addEdge(LongWritable targetId,
+ FloatWritable edgeValue) {
if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
if (LOG.isDebugEnabled()) {
LOG.debug("addEdge: Vertex=" + vertexId +
@@ -96,7 +106,8 @@ public abstract class LongDoubleFloatDou
@Override
public final LongWritable getVertexId() {
- return new LongWritable(vertexId); // TODO: possibly not make new objects every time?
+ // TODO: possibly not make new objects every time?
+ return new LongWritable(vertexId);
}
@Override
@@ -119,16 +130,22 @@ public abstract class LongDoubleFloatDou
}
@Override
- public final void sendMsgToAllEdges(DoubleWritable msg) {
+ public final void sendMsgToAllEdges(final DoubleWritable msg) {
if (msg == null) {
throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- LongWritable destVertex = new LongWritable();
- for (long destVertexId : verticesWithEdgeValues.keys().elements()) {
- destVertex.set(destVertexId);
- sendMsg(destVertex, msg);
+ "sendMsgToAllEdges: Cannot send null message to all edges");
}
+ final LongWritable destVertex = new LongWritable();
+ final MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> vertex = this;
+ verticesWithEdgeValues.forEachKey(new LongProcedure() {
+ @Override
+ public boolean apply(long destVertexId) {
+ destVertex.set(destVertexId);
+ vertex.sendMsg(destVertex, msg);
+ return true;
+ }
+ });
}
@Override
@@ -144,10 +161,11 @@ public abstract class LongDoubleFloatDou
@Override
public Iterator<LongWritable> iterator() {
final long[] destVertices = verticesWithEdgeValues.keys().elements();
+ final int destVerticesSize = verticesWithEdgeValues.size();
return new Iterator<LongWritable>() {
int offset = 0;
@Override public boolean hasNext() {
- return offset < destVertices.length;
+ return offset < destVerticesSize;
}
@Override public LongWritable next() {
@@ -156,7 +174,7 @@ public abstract class LongDoubleFloatDou
@Override public void remove() {
throw new UnsupportedOperationException(
- "Mutation disallowed for edge list via iterator");
+ "Mutation disallowed for edge list via iterator");
}
};
}
@@ -183,7 +201,8 @@ public abstract class LongDoubleFloatDou
}
@Override
- public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex)
+ public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable,
+ FloatWritable, DoubleWritable> vertex)
throws IOException {
getGraphState().getWorkerCommunications().addVertexReq(vertex);
}
@@ -197,12 +216,14 @@ public abstract class LongDoubleFloatDou
public void addEdgeRequest(LongWritable vertexIndex,
Edge<LongWritable, FloatWritable> edge)
throws IOException {
- getGraphState().getWorkerCommunications().addEdgeReq(vertexIndex, edge);
+ getGraphState().getWorkerCommunications().addEdgeReq(vertexIndex,
+ edge);
}
@Override
public void removeEdgeRequest(LongWritable sourceVertexId,
- LongWritable destVertexId) throws IOException {
+ LongWritable destVertexId)
+ throws IOException {
getGraphState().getWorkerCommunications().removeEdgeReq(
sourceVertexId, destVertexId);
}
@@ -225,19 +246,36 @@ public abstract class LongDoubleFloatDou
}
@Override
- public final void write(DataOutput out) throws IOException {
+ public final void write(final DataOutput out) throws IOException {
out.writeLong(vertexId);
out.writeDouble(vertexValue);
out.writeLong(verticesWithEdgeValues.size());
- for(long destVertexId : verticesWithEdgeValues.keys().elements()) {
- float edgeValue = verticesWithEdgeValues.get(destVertexId);
- out.writeLong(destVertexId);
- out.writeFloat(edgeValue);
- }
+ verticesWithEdgeValues.forEachPair(new LongFloatProcedure() {
+ @Override
+ public boolean apply(long destVertexId, float edgeValue) {
+ try {
+ out.writeLong(destVertexId);
+ out.writeFloat(edgeValue);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "apply: IOException when not allowed", e);
+ }
+ return true;
+ }
+ });
out.writeLong(messageList.size());
- for(double msg : messageList.elements()) {
- out.writeDouble(msg);
- }
+ messageList.forEach(new DoubleProcedure() {
+ @Override
+ public boolean apply(double message) {
+ try {
+ out.writeDouble(message);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "apply: IOException when not allowed", e);
+ }
+ return true;
+ }
+ });
out.writeBoolean(halt);
}
@@ -271,36 +309,35 @@ public abstract class LongDoubleFloatDou
private final DoubleArrayList elementList;
- public UnmodifiableDoubleWritableIterable(DoubleArrayList elementList) {
+ public UnmodifiableDoubleWritableIterable(
+ DoubleArrayList elementList) {
this.elementList = elementList;
}
@Override
public Iterator<DoubleWritable> iterator() {
return new UnmodifiableDoubleWritableIterator(
- elementList.elements());
+ elementList);
}
}
private class UnmodifiableDoubleWritableIterator
extends UnmodifiableIterator<DoubleWritable> {
+ private final DoubleArrayList elementList;
+ private int offset = 0;
- private final double[] elements;
- private int offset;
-
- UnmodifiableDoubleWritableIterator(double[] elements) {
- offset = 0;
- this.elements = elements;
+ UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
+ this.elementList = elementList;
}
@Override
public boolean hasNext() {
- return offset < elements.length;
+ return offset < elementList.size();
}
@Override
public DoubleWritable next() {
- return new DoubleWritable(elements[offset++]);
+ return new DoubleWritable(elementList.get(offset++));
}
}
}