You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by er...@apache.org on 2012/10/14 10:08:10 UTC
svn commit: r1398032 - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/netty/
giraph/src/main/java/org/apache/giraph/graph/
Author: ereisman
Date: Sun Oct 14 08:08:09 2012
New Revision: 1398032
URL: http://svn.apache.org/viewvc?rev=1398032&view=rev
Log:
GIRAPH-367: Expose WorkerInfo to clients (Nitay Joffe via ereisman)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Sun Oct 14 08:08:09 2012
@@ -1,6 +1,7 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-367: Expose WorkerInfo to clients (Nitay Joffe via ereisman)
GIRAPH-370: AccumuloVertexOutputFormat public visibility for
TABLE_NAME. (bfem via aching)
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Sun Oct 14 08:08:09 2012
@@ -23,6 +23,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -62,6 +63,13 @@ else[HADOOP_NON_SECURE]*/
void fixPartitionIdToSocketAddrMap();
/**
+ * Lookup PartitionOwner for a vertex.
+ * @param vertexId id to look up.
+ * @return PartitionOwner holding the vertex.
+ */
+ PartitionOwner getVertexPartitionOwner(I vertexId);
+
+ /**
* Sends a message to destination vertex.
*
* @param destVertexId Destination vertex id.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Sun Oct 14 08:08:09 2012
@@ -239,9 +239,18 @@ public class NettyWorkerClient<I extends
}
}
+ /**
+ * Lookup PartitionOwner for a vertex.
+ * @param vertexId id to look up.
+ * @return PartitionOwner holding the vertex.
+ */
+ public PartitionOwner getVertexPartitionOwner(I vertexId) {
+ return service.getVertexPartitionOwner(vertexId);
+ }
+
@Override
public void sendMessageRequest(I destVertexId, M message) {
- PartitionOwner owner = service.getVertexPartitionOwner(destVertexId);
+ PartitionOwner owner = getVertexPartitionOwner(destVertexId);
WorkerInfo workerInfo = owner.getWorkerInfo();
final int partitionId = owner.getPartitionId();
if (LOG.isTraceEnabled()) {
@@ -342,8 +351,7 @@ public class NettyWorkerClient<I extends
@Override
public void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws
IOException {
- PartitionOwner partitionOwner =
- service.getVertexPartitionOwner(vertexIndex);
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("addEdgeRequest: Sending edge " + edge + " for index " +
@@ -361,8 +369,7 @@ public class NettyWorkerClient<I extends
@Override
public void removeEdgeRequest(I vertexIndex,
I destinationVertexIndex) throws IOException {
- PartitionOwner partitionOwner =
- service.getVertexPartitionOwner(vertexIndex);
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("removeEdgeRequest: Removing edge " +
@@ -381,8 +388,7 @@ public class NettyWorkerClient<I extends
@Override
public void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException {
- PartitionOwner partitionOwner =
- service.getVertexPartitionOwner(vertex.getId());
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertex.getId());
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("addVertexRequest: Sending vertex " + vertex +
@@ -399,8 +405,7 @@ public class NettyWorkerClient<I extends
@Override
public void removeVertexRequest(I vertexIndex) throws IOException {
- PartitionOwner partitionOwner =
- service.getVertexPartitionOwner(vertexIndex);
+ PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex);
int partitionId = partitionOwner.getPartitionId();
if (LOG.isTraceEnabled()) {
LOG.trace("removeVertexRequest: Removing vertex index " +
@@ -442,7 +447,7 @@ public class NettyWorkerClient<I extends
new SendPartitionMutationsRequest<I, V, E, M>(
entry.getKey(), entry.getValue());
PartitionOwner partitionOwner =
- service.getVertexPartitionOwner(
+ getVertexPartitionOwner(
entry.getValue().keySet().iterator().next());
InetSocketAddress remoteServerAddress =
getInetSocketAddress(partitionOwner.getWorkerInfo(),
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java Sun Oct 14 08:08:09 2012
@@ -28,6 +28,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -78,6 +79,11 @@ public class NettyWorkerClientServer<I e
}
@Override
+ public PartitionOwner getVertexPartitionOwner(I vertexId) {
+ return client.getVertexPartitionOwner(vertexId);
+ }
+
+ @Override
public void sendMessageRequest(I destVertexId, M message) {
client.sendMessageRequest(destVertexId, message);
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1398032&r1=1398031&r2=1398032&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Vertex.java Sun Oct 14 08:08:09 2012
@@ -20,6 +20,8 @@ package org.apache.giraph.graph;
import org.apache.giraph.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.WorkerClientServer;
+import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -219,8 +221,41 @@ public abstract class Vertex<I extends W
throw new IllegalArgumentException(
"sendMessage: Cannot send null message to " + id);
}
- getGraphState().getWorkerCommunications().
- sendMessageRequest(id, message);
+ getWorkerCommunications().sendMessageRequest(id, message);
+ }
+
+ /**
+ * Lookup WorkerInfo for myself.
+ * @return WorkerInfo about worker holding this Vertex.
+ */
+ public WorkerInfo getMyWorkerInfo() {
+ return getVertexWorkerInfo(id);
+ }
+
+ /**
+ * Lookup WorkerInfo for a Vertex.
+ * @param vertexId VertexId to lookup
+ * @return WorkerInfo about worker holding this Vertex.
+ */
+ public WorkerInfo getVertexWorkerInfo(I vertexId) {
+ return getVertexPartitionOwner(vertexId).getWorkerInfo();
+ }
+
+ /**
+ * Lookup PartitionOwner for a Vertex
+ * @param vertexId id of Vertex to look up.
+ * @return PartitionOwner holding Vertex
+ */
+ private PartitionOwner getVertexPartitionOwner(I vertexId) {
+ return getWorkerCommunications().getVertexPartitionOwner(vertexId);
+ }
+
+ /**
+ * Get WorkerClientServer used to communicate with other servers.
+ * @return WorkerClientServer used.
+ */
+ private WorkerClientServer<I, V, E, M> getWorkerCommunications() {
+ return getGraphState().getWorkerCommunications();
}
/**