You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2012/10/14 10:08:10 UTC

svn commit: r1398032 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/org/apache/giraph/graph/

Author: ereisman
Date: Sun Oct 14 08:08:09 2012
New Revision: 1398032

URL: http://svn.apache.org/viewvc?rev=1398032&view=rev
Log:
GIRAPH-367: Expose WorkerInfo to clients (Nitay Joffe via ereisman)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sun Oct 14 08:08:09 2012
@@ -1,6 +1,7 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-367: Expose WorkerInfo to clients (Nitay Joffe via ereisman)
 
   GIRAPH-370: AccumuloVertexOutputFormat public visibility for
   TABLE_NAME. (bfem via aching)

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Sun Oct 14 08:08:09 2012
@@ -23,6 +23,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.Partition;
 
+import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -62,6 +63,13 @@ else[HADOOP_NON_SECURE]*/
   void fixPartitionIdToSocketAddrMap();
 
   /**
+   * Lookup PartitionOwner for a vertex.
+   * @param vertexId id to look up.
+   * @return PartitionOwner holding the vertex.
+   */
+  PartitionOwner getVertexPartitionOwner(I vertexId);
+
+  /**
    * Sends a message to destination vertex.
    *
    * @param destVertexId Destination vertex id.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Sun Oct 14 08:08:09 2012
@@ -239,9 +239,18 @@ public class NettyWorkerClient<I extends
     }
   }
 
+  /**
+   * Lookup PartitionOwner for a vertex.
+   * @param vertexId id to look up.
+   * @return PartitionOwner holding the vertex.
+   */
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return service.getVertexPartitionOwner(vertexId);
+  }
+
   @Override
   public void sendMessageRequest(I destVertexId, M message) {
-    PartitionOwner owner = service.getVertexPartitionOwner(destVertexId);
+    PartitionOwner owner = getVertexPartitionOwner(destVertexId);
     WorkerInfo workerInfo = owner.getWorkerInfo();
     final int partitionId = owner.getPartitionId();
     if (LOG.isTraceEnabled()) {
@@ -342,8 +351,7 @@ public class NettyWorkerClient<I extends
   @Override
   public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
       IOException {
-    PartitionOwner partitionOwner =
-        service.getVertexPartitionOwner(vertexIndex);
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
@@ -361,8 +369,7 @@ public class NettyWorkerClient<I extends
   @Override
   public void removeEdgeRequest(I vertexIndex,
                                 I destinationVertexIndex) throws IOException {
-    PartitionOwner partitionOwner =
-        service.getVertexPartitionOwner(vertexIndex);
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("removeEdgeRequest: Removing edge " +
@@ -381,8 +388,7 @@ public class NettyWorkerClient<I extends
 
   @Override
   public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
-    PartitionOwner partitionOwner =
-        service.getVertexPartitionOwner(vertex.getId());
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertex.getId());
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("addVertexRequest: Sending vertex " + vertex +
@@ -399,8 +405,7 @@ public class NettyWorkerClient<I extends
 
   @Override
   public void removeVertexRequest(I vertexIndex) throws IOException {
-    PartitionOwner partitionOwner =
-        service.getVertexPartitionOwner(vertexIndex);
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
     int partitionId = partitionOwner.getPartitionId();
     if (LOG.isTraceEnabled()) {
       LOG.trace("removeVertexRequest: Removing vertex index " +
@@ -442,7 +447,7 @@ public class NettyWorkerClient<I extends
           new SendPartitionMutationsRequest<I, V, E, M>(
               entry.getKey(), entry.getValue());
       PartitionOwner partitionOwner =
-          service.getVertexPartitionOwner(
+          getVertexPartitionOwner(
               entry.getValue().keySet().iterator().next());
       InetSocketAddress remoteServerAddress =
           getInetSocketAddress(partitionOwner.getWorkerInfo(),

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java Sun Oct 14 08:08:09 2012
@@ -28,6 +28,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -78,6 +79,11 @@ public class NettyWorkerClientServer<I e
   }
 
   @Override
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return client.getVertexPartitionOwner(vertexId);
+  }
+
+  @Override
   public void sendMessageRequest(I destVertexId, M message) {
     client.sendMessageRequest(destVertexId, message);
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Sun Oct 14 08:08:09 2012
@@ -20,6 +20,8 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.WorkerClientServer;
+import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -219,8 +221,41 @@ public abstract class Vertex<I extends W
       throw new IllegalArgumentException(
           "sendMessage: Cannot send null message to " + id);
     }
-    getGraphState().getWorkerCommunications().
-        sendMessageRequest(id, message);
+    getWorkerCommunications().sendMessageRequest(id, message);
+  }
+
+  /**
+   * Lookup WorkerInfo for myself.
+   * @return WorkerInfo about worker holding this Vertex.
+   */
+  public WorkerInfo getMyWorkerInfo() {
+    return getVertexWorkerInfo(id);
+  }
+
+  /**
+   * Lookup WorkerInfo for a Vertex.
+   * @param vertexId VertexId to lookup
+   * @return WorkerInfo about worker holding this Vertex.
+   */
+  public WorkerInfo getVertexWorkerInfo(I vertexId) {
+    return getVertexPartitionOwner(vertexId).getWorkerInfo();
+  }
+
+  /**
+   * Lookup PartitionOwner for a Vertex
+   * @param vertexId id of Vertex to look up.
+   * @return PartitionOwner holding Vertex
+   */
+  private PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return getWorkerCommunications().getVertexPartitionOwner(vertexId);
+  }
+
+  /**
+   * Get WorkerClientServer used to communicate with other servers.
+   * @return WorkerClientServer used.
+   */
+  private WorkerClientServer<I, V, E, M> getWorkerCommunications() {
+    return getGraphState().getWorkerCommunications();
   }
 
   /**