You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2011/12/22 07:53:45 UTC

svn commit: r1222071 - in /incubator/giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java

Author: aching
Date: Thu Dec 22 06:53:44 2011
New Revision: 1222071

URL: http://svn.apache.org/viewvc?rev=1222071&view=rev
Log:
GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex.
(aching)


Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1222071&r1=1222070&r2=1222071&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu Dec 22 06:53:44 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex.
+  (aching)
+
   GIRAPH-114: Inconsistent message map handling in
   BasicRPCCommunications.LargeMessageFlushExecutor. (ssc via aching)
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1222071&r1=1222070&r2=1222071&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Dec 22 06:53:44 2011
@@ -22,6 +22,9 @@ import org.apache.hadoop.io.DoubleWritab
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
+import org.apache.mahout.math.function.DoubleProcedure;
+import org.apache.mahout.math.function.LongFloatProcedure;
+import org.apache.mahout.math.function.LongProcedure;
 import org.apache.mahout.math.list.DoubleArrayList;
 import org.apache.mahout.math.map.OpenLongFloatHashMap;
 
@@ -33,38 +36,45 @@ import java.util.List;
 import java.util.Map;
 
 public abstract class LongDoubleFloatDoubleVertex extends
-        MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+        MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+        DoubleWritable> {
     /** Class logger */
-    private static final Logger LOG = Logger.getLogger(LongDoubleFloatDoubleVertex.class);
+    private static final Logger LOG =
+        Logger.getLogger(LongDoubleFloatDoubleVertex.class);
 
     private long vertexId;
     private double vertexValue;
-    private OpenLongFloatHashMap verticesWithEdgeValues = new OpenLongFloatHashMap();
+    private OpenLongFloatHashMap verticesWithEdgeValues =
+        new OpenLongFloatHashMap();
     private DoubleArrayList messageList = new DoubleArrayList();
 
     @Override
     public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
-        Map<LongWritable, FloatWritable> edgesW, List<DoubleWritable> messagesW) {
-      if (vertexIdW != null ) {
-        vertexId = vertexIdW.get();
-      }
-      if (vertexValueW != null) {
-        vertexValue = vertexValueW.get();
-      }
-      if (edgesW != null) {
-        for(Map.Entry<LongWritable, FloatWritable> entry : edgesW.entrySet()) {
-         verticesWithEdgeValues.put(entry.getKey().get(), entry.getValue().get());
-        }
-      }
-      if (messagesW != null) {
-        for(DoubleWritable m : messagesW) {
-          messageList.add(m.get());
+                           Map<LongWritable, FloatWritable> edgesW,
+                           List<DoubleWritable> messagesW) {
+        if (vertexIdW != null ) {
+            vertexId = vertexIdW.get();
+        }
+        if (vertexValueW != null) {
+            vertexValue = vertexValueW.get();
+        }
+        if (edgesW != null) {
+            for (Map.Entry<LongWritable, FloatWritable> entry :
+                    edgesW.entrySet()) {
+                verticesWithEdgeValues.put(entry.getKey().get(),
+                                           entry.getValue().get());
+            }
+        }
+        if (messagesW != null) {
+            for(DoubleWritable m : messagesW) {
+                messageList.add(m.get());
+            }
         }
-      }
     }
 
     @Override
-    public final boolean addEdge(LongWritable targetId, FloatWritable edgeValue) {
+    public final boolean addEdge(LongWritable targetId,
+                                 FloatWritable edgeValue) {
         if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("addEdge: Vertex=" + vertexId +
@@ -96,7 +106,8 @@ public abstract class LongDoubleFloatDou
 
     @Override
     public final LongWritable getVertexId() {
-        return new LongWritable(vertexId); // TODO: possibly not make new objects every time?
+        // TODO: possibly not make new objects every time?
+        return new LongWritable(vertexId);
     }
 
     @Override
@@ -119,16 +130,22 @@ public abstract class LongDoubleFloatDou
     }
 
     @Override
-    public final void sendMsgToAllEdges(DoubleWritable msg) {
+    public final void sendMsgToAllEdges(final DoubleWritable msg) {
         if (msg == null) {
             throw new IllegalArgumentException(
-                    "sendMsgToAllEdges: Cannot send null message to all edges");
-        }
-        LongWritable destVertex = new LongWritable();
-        for (long destVertexId : verticesWithEdgeValues.keys().elements()) {
-            destVertex.set(destVertexId);
-            sendMsg(destVertex, msg);
+                "sendMsgToAllEdges: Cannot send null message to all edges");
         }
+        final LongWritable destVertex = new LongWritable();
+        final MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+            DoubleWritable> vertex = this;
+        verticesWithEdgeValues.forEachKey(new LongProcedure() {
+            @Override
+            public boolean apply(long destVertexId) {
+                destVertex.set(destVertexId);
+                vertex.sendMsg(destVertex, msg);
+                return true;
+            }
+        });
     }
 
     @Override
@@ -144,10 +161,11 @@ public abstract class LongDoubleFloatDou
     @Override
     public Iterator<LongWritable> iterator() {
         final long[] destVertices = verticesWithEdgeValues.keys().elements();
+        final int destVerticesSize = verticesWithEdgeValues.size();
         return new Iterator<LongWritable>() {
             int offset = 0;
             @Override public boolean hasNext() {
-                return offset < destVertices.length;
+                return offset < destVerticesSize;
             }
 
             @Override public LongWritable next() {
@@ -156,7 +174,7 @@ public abstract class LongDoubleFloatDou
 
             @Override public void remove() {
                 throw new UnsupportedOperationException(
-                        "Mutation disallowed for edge list via iterator");
+                    "Mutation disallowed for edge list via iterator");
             }
         };
     }
@@ -183,7 +201,8 @@ public abstract class LongDoubleFloatDou
     }
 
     @Override
-    public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> vertex)
+    public void addVertexRequest(MutableVertex<LongWritable, DoubleWritable,
+            FloatWritable, DoubleWritable> vertex)
             throws IOException {
         getGraphState().getWorkerCommunications().addVertexReq(vertex);
     }
@@ -197,12 +216,14 @@ public abstract class LongDoubleFloatDou
     public void addEdgeRequest(LongWritable vertexIndex,
                                Edge<LongWritable, FloatWritable> edge)
                                throws IOException {
-        getGraphState().getWorkerCommunications().addEdgeReq(vertexIndex, edge);
+        getGraphState().getWorkerCommunications().addEdgeReq(vertexIndex,
+                                                             edge);
     }
 
     @Override
     public void removeEdgeRequest(LongWritable sourceVertexId,
-                                  LongWritable destVertexId) throws IOException {
+                                  LongWritable destVertexId)
+                                  throws IOException {
         getGraphState().getWorkerCommunications().removeEdgeReq(
             sourceVertexId, destVertexId);
     }
@@ -225,19 +246,36 @@ public abstract class LongDoubleFloatDou
     }
 
     @Override
-    public final void write(DataOutput out) throws IOException {
+    public final void write(final DataOutput out) throws IOException {
         out.writeLong(vertexId);
         out.writeDouble(vertexValue);
         out.writeLong(verticesWithEdgeValues.size());
-        for(long destVertexId : verticesWithEdgeValues.keys().elements()) {
-            float edgeValue = verticesWithEdgeValues.get(destVertexId);
-            out.writeLong(destVertexId);
-            out.writeFloat(edgeValue);
-        }
+        verticesWithEdgeValues.forEachPair(new LongFloatProcedure() {
+            @Override
+            public boolean apply(long destVertexId, float edgeValue) {
+                try {
+                    out.writeLong(destVertexId);
+                    out.writeFloat(edgeValue);
+                } catch (IOException e) {
+                    throw new IllegalStateException(
+                        "apply: IOException when not allowed", e);
+                }
+                return true;
+            }
+        });
         out.writeLong(messageList.size());
-        for(double msg : messageList.elements()) {
-            out.writeDouble(msg);
-        }
+        messageList.forEach(new DoubleProcedure() {
+             @Override
+             public boolean apply(double message) {
+                 try {
+                     out.writeDouble(message);
+                 } catch (IOException e) {
+                     throw new IllegalStateException(
+                         "apply: IOException when not allowed", e);
+                 }
+                 return true;
+             }
+        });
         out.writeBoolean(halt);
     }
 
@@ -271,36 +309,35 @@ public abstract class LongDoubleFloatDou
 
         private final DoubleArrayList elementList;
 
-        public UnmodifiableDoubleWritableIterable(DoubleArrayList elementList) {
+        public UnmodifiableDoubleWritableIterable(
+                DoubleArrayList elementList) {
             this.elementList = elementList;
         }
 
         @Override
         public Iterator<DoubleWritable> iterator() {
             return new UnmodifiableDoubleWritableIterator(
-                    elementList.elements());
+                    elementList);
         }
     }
 
     private class UnmodifiableDoubleWritableIterator
             extends UnmodifiableIterator<DoubleWritable> {
+        private final DoubleArrayList elementList;
+        private int offset = 0;
 
-        private final double[] elements;
-        private int offset;
-
-        UnmodifiableDoubleWritableIterator(double[] elements) {
-            offset = 0;
-            this.elements = elements;
+        UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
+            this.elementList = elementList;
         }
 
         @Override
         public boolean hasNext() {
-            return offset < elements.length;
+            return offset < elementList.size();
         }
 
         @Override
         public DoubleWritable next() {
-            return new DoubleWritable(elements[offset++]);
+            return new DoubleWritable(elementList.get(offset++));
         }
     }
 }