You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/11/13 21:03:11 UTC

svn commit: r1408926 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/org/apache/giraph/comm/netty/handler/ giraph/sr...

Author: maja
Date: Tue Nov 13 20:03:07 2012
New Revision: 1408926

URL: http://svn.apache.org/viewvc?rev=1408926&view=rev
Log:
GIRAPH-386: ClassCastException when giraph.SplitMasterWorker=false

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java
Removed:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClientServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Nov 13 20:03:07 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-386: ClassCastException when giraph.SplitMasterWorker=false
+  (majakabiljo)
+
   GIRAPH-423: Allow overriding addEdge (apresta)
 
   GIRAPH-422: Setting the log level of the root logger to the same level

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Tue Nov 13 20:03:07 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.bsp;
 import java.io.IOException;
 
 import org.apache.giraph.graph.MasterAggregatorHandler;
+import org.apache.giraph.graph.MasterInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.zookeeper.KeeperException;
@@ -50,6 +51,13 @@ public interface CentralizedServiceMaste
   boolean becomeMaster();
 
   /**
+   * Get master information
+   *
+   * @return Master information
+   */
+  MasterInfo getMasterInfo();
+
+  /**
    * Create the {@link InputSplit} objects from the index range based on the
    * user-defined VertexInputFormat.  The {@link InputSplit} objects will
    * processed by the workers later on during the INPUT_SUPERSTEP.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Tue Nov 13 20:03:07 2012
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.ServerData
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.MasterInfo;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.graph.WorkerAggregatorHandler;
 import org.apache.giraph.graph.partition.PartitionStore;
@@ -197,7 +198,7 @@ public interface CentralizedServiceWorke
    *
    * @return Master info
    */
-  WorkerInfo getMasterInfo();
+  MasterInfo getMasterInfo();
 
   /**
    * Get the GraphMapper that this service is using.  Vertices need to know

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Tue Nov 13 20:03:07 2012
@@ -71,10 +71,10 @@ else[HADOOP_NON_SECURE]*/
   /**
    * Send a request to a remote server (should be already connected)
    *
-   * @param destWorkerId Destination worker id
+   * @param destTaskId Destination worker id
    * @param request Request to send
    */
-  void sendWritableRequest(Integer destWorkerId, WritableRequest request);
+  void sendWritableRequest(Integer destTaskId, WritableRequest request);
 
   /**
    * Wait until all the outstanding requests are completed.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java Tue Nov 13 20:03:07 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.Closeable;
+import java.net.InetSocketAddress;
 
 /**
  * Interface for message communication server.
@@ -37,11 +38,11 @@ public interface WorkerServer<I extends 
     V extends Writable, E extends Writable, M extends Writable>
     extends Closeable {
   /**
-   * Get the port
+   * Get server address
    *
-   * @return Port used by this server
+   * @return Address used by this server
    */
-  int getPort();
+  InetSocketAddress getMyAddress();
 
   /**
    * Prepare incoming messages for computation, and resolve mutation requests.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Tue Nov 13 20:03:07 2012
@@ -48,7 +48,7 @@ import org.apache.giraph.comm.requests.R
 import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
 /*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
@@ -140,8 +140,8 @@ else[HADOOP_NON_SECURE]*/
   /** Address request id generator */
   private final AddressRequestIdGenerator addressRequestIdGenerator =
       new AddressRequestIdGenerator();
-  /** Client id */
-  private final int clientId;
+  /** Task info */
+  private final TaskInfo myTaskInfo;
   /** Maximum thread pool size */
   private final int maxPoolSize;
   /** Maximum number of attempts to resolve an address*/
@@ -156,10 +156,13 @@ else[HADOOP_NON_SECURE]*/
    *
    * @param context Context for progress
    * @param conf Configuration
+   * @param myTaskInfo Current task info
    */
   public NettyClient(Mapper<?, ?, ?, ?>.Context context,
-                     final ImmutableClassesGiraphConfiguration conf) {
+                     final ImmutableClassesGiraphConfiguration conf,
+                     TaskInfo myTaskInfo) {
     this.context = context;
+    this.myTaskInfo = myTaskInfo;
     this.channelsPerServer = conf.getInt(
         GiraphConfiguration.CHANNELS_PER_SERVER,
         GiraphConfiguration.DEFAULT_CHANNELS_PER_SERVER);
@@ -239,8 +242,6 @@ else[HADOOP_NON_SECURE]*/
         new ThreadFactoryBuilder().setNameFormat(
             "netty-client-worker-%d").build());
 
-    clientId = conf.getInt("mapred.task.partition", -1);
-
     // Configure the client.
     bootstrap = new ClientBootstrap(
         new NioClientSocketChannelFactory(
@@ -353,10 +354,10 @@ else[HADOOP_NON_SECURE]*/
    *
    * @param tasks Tasks to connect to (if haven't already connected)
    */
-  public void connectAllAddresses(Collection<WorkerInfo> tasks) {
+  public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
     List<ChannelFutureAddress> waitingConnectionList =
         Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
-    for (WorkerInfo taskInfo : tasks) {
+    for (TaskInfo taskInfo : tasks) {
       context.progress();
       InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());
       if (address == null ||
@@ -622,12 +623,12 @@ else[HADOOP_NON_SECURE]*/
   /**
    * Send a request to a remote server (should be already connected)
    *
-   * @param destWorkerId Destination worker id
+   * @param destTaskId Destination task id
    * @param request Request to send
    */
-  public void sendWritableRequest(Integer destWorkerId,
+  public void sendWritableRequest(Integer destTaskId,
       WritableRequest request) {
-    InetSocketAddress remoteServer = taskIdAddressMap.get(destWorkerId);
+    InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
     if (clientRequestIdRequestInfoMap.isEmpty()) {
       byteCounter.resetAll();
     }
@@ -642,11 +643,11 @@ else[HADOOP_NON_SECURE]*/
     Channel channel = getNextChannel(remoteServer);
     RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
     if (registerRequest) {
-      request.setClientId(clientId);
+      request.setClientId(myTaskInfo.getTaskId());
       request.setRequestId(
         addressRequestIdGenerator.getNextRequestId(remoteServer));
       ClientRequestId clientRequestId =
-        new ClientRequestId(destWorkerId, request.getRequestId());
+        new ClientRequestId(destTaskId, request.getRequestId());
       RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
         clientRequestId, newRequestInfo);
       if (oldRequestInfo != null) {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java Tue Nov 13 20:03:07 2012
@@ -58,7 +58,8 @@ public class NettyMasterClient implement
   public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
                            ImmutableClassesGiraphConfiguration configuration,
                            CentralizedServiceMaster<?, ?, ?, ?> service) {
-    this.nettyClient = new NettyClient(context, configuration);
+    this.nettyClient =
+        new NettyClient(context, configuration, service.getMasterInfo());
     this.service = service;
     this.progressable = context;
     maxBytesPerAggregatorRequest = configuration.getInt(

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java Tue Nov 13 20:03:07 2012
@@ -19,9 +19,9 @@
 package org.apache.giraph.comm.netty;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
 import org.apache.giraph.comm.MasterServer;
-import org.apache.giraph.graph.MasterAggregatorHandler;
 
 import java.net.InetSocketAddress;
 
@@ -36,12 +36,13 @@ public class NettyMasterServer implement
    * Constructor
    *
    * @param conf Hadoop configuration
-   * @param aggregatorHandler Master aggregator handler
+   * @param service Centralized service
    */
   public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
-      MasterAggregatorHandler aggregatorHandler) {
+      CentralizedServiceMaster<?, ?, ?, ?> service) {
     nettyServer = new NettyServer(conf,
-        new MasterRequestServerHandler.Factory(aggregatorHandler));
+        new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
+        service.getMasterInfo());
     nettyServer.start();
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Tue Nov 13 20:03:07 2012
@@ -39,6 +39,7 @@ import org.apache.giraph.comm.netty.hand
 import java.util.concurrent.TimeUnit;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.TaskInfo;
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
@@ -87,6 +88,8 @@ else[HADOOP_NON_SECURE]*/
   private final String localHostname;
   /** Address of the server */
   private InetSocketAddress myAddress;
+  /** Current task info */
+  private TaskInfo myTaskInfo;
   /** Maximum number of threads */
   private final int maxPoolSize;
   /** TCP backlog */
@@ -122,15 +125,18 @@ else[HADOOP_NON_SECURE]*/
    *
    * @param conf Configuration to use
    * @param requestServerHandlerFactory Factory for request handlers
+   * @param myTaskInfo Current task info
    */
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
-      RequestServerHandler.Factory requestServerHandlerFactory) {
+      RequestServerHandler.Factory requestServerHandlerFactory,
+      TaskInfo myTaskInfo) {
     this.conf = conf;
     this.requestServerHandlerFactory = requestServerHandlerFactory;
     /*if[HADOOP_NON_SECURE]
     else[HADOOP_NON_SECURE]*/
     this.saslServerHandlerFactory = new SaslServerHandler.Factory();
     /*end[HADOOP_NON_SECURE]*/
+    this.myTaskInfo = myTaskInfo;
     sendBufferSize = conf.getInt(
         GiraphConfiguration.SERVER_SEND_BUFFER_SIZE,
         GiraphConfiguration.DEFAULT_SERVER_SEND_BUFFER_SIZE);
@@ -196,12 +202,14 @@ else[HADOOP_NON_SECURE]*/
    *
    * @param conf Configuration to use
    * @param requestServerHandlerFactory Factory for request handlers
+   * @param myTaskInfo Current task info
    * @param saslServerHandlerFactory  Factory for SASL handlers
    */
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
                      RequestServerHandler.Factory requestServerHandlerFactory,
+                     TaskInfo myTaskInfo,
                      SaslServerHandler.Factory saslServerHandlerFactory) {
-    this(conf, requestServerHandlerFactory);
+    this(conf, requestServerHandlerFactory, myTaskInfo);
     this.saslServerHandlerFactory = saslServerHandlerFactory;
   }
 /*end[HADOOP_NON_SECURE]*/
@@ -247,7 +255,7 @@ else[HADOOP_NON_SECURE]*/
               saslServerHandlerFactory.newHandler(conf),
               new AuthorizeServerHandler(),
               requestServerHandlerFactory.newHandler(workerRequestReservedMap,
-                  conf),
+                  conf, myTaskInfo),
               // Removed after authentication completes:
               new ResponseEncoder());
         } else {
@@ -263,7 +271,7 @@ else[HADOOP_NON_SECURE]*/
               new RequestDecoder(conf, byteCounter));
           pipeline.addLast("requestProcessor",
               requestServerHandlerFactory.newHandler(
-                  workerRequestReservedMap, conf));
+                  workerRequestReservedMap, conf, myTaskInfo));
           if (executionHandler != null) {
             pipeline.addAfter(handlerBeforeExecutionHandler,
                 "executionHandler", executionHandler);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Tue Nov 13 20:03:07 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.ImmutableClasse
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
@@ -67,7 +68,8 @@ public class NettyWorkerClient<I extends
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
       CentralizedServiceWorker<I, V, E, M> service) {
-    this.nettyClient = new NettyClient(context, configuration);
+    this.nettyClient =
+        new NettyClient(context, configuration, service.getWorkerInfo());
     this.conf = configuration;
     this.service = service;
   }
@@ -78,7 +80,7 @@ public class NettyWorkerClient<I extends
 
   @Override
   public void openConnections() {
-    List<WorkerInfo> addresses = Lists.newArrayListWithCapacity(
+    List<TaskInfo> addresses = Lists.newArrayListWithCapacity(
         service.getWorkerInfoList().size());
     for (WorkerInfo info : service.getWorkerInfoList()) {
       // No need to connect to myself
@@ -96,9 +98,9 @@ public class NettyWorkerClient<I extends
   }
 
   @Override
-  public void sendWritableRequest(Integer destWorkerId,
+  public void sendWritableRequest(Integer destTaskId,
                                   WritableRequest request) {
-    nettyClient.sendWritableRequest(destWorkerId, request);
+    nettyClient.sendWritableRequest(destTaskId, request);
   }
 
   @Override

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Nov 13 20:03:07 2012
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
 
+import java.net.InetSocketAddress;
 import java.util.Set;
 
 /**
@@ -89,7 +90,8 @@ public class NettyWorkerServer<I extends
         new ServerData<I, V, E, M>(conf, createMessageStoreFactory(), context);
 
     nettyServer = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData));
+        new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
+        service.getWorkerInfo());
     nettyServer.start();
   }
 
@@ -138,8 +140,8 @@ public class NettyWorkerServer<I extends
   }
 
   @Override
-  public int getPort() {
-    return nettyServer.getMyAddress().getPort();
+  public InetSocketAddress getMyAddress() {
+    return nettyServer.getMyAddress();
   }
 
   @Override

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java Tue Nov 13 20:03:07 2012
@@ -21,27 +21,27 @@ package org.apache.giraph.comm.netty.han
 /**
  * Simple immutable object to use for tracking requests uniquely.  This
  * object is guaranteed to be unique for a given client (based on the
- * destination worker and the request).
+ * destination task and the request).
  */
 public class ClientRequestId {
-  /** Destination worker id */
-  private final int destinationWorkerId;
+  /** Destination task id */
+  private final int destinationTaskId;
   /** Request id */
   private final long requestId;
 
   /**
    * Constructor.
    *
-   * @param destinationWorkerId Destination worker id
+   * @param destinationTaskId Destination task id
    * @param requestId Request id
    */
-  public ClientRequestId(int destinationWorkerId, long requestId) {
-    this.destinationWorkerId = destinationWorkerId;
+  public ClientRequestId(int destinationTaskId, long requestId) {
+    this.destinationTaskId = destinationTaskId;
     this.requestId = requestId;
   }
 
-  public int getDestinationWorkerId() {
-    return destinationWorkerId;
+  public int getDestinationTaskId() {
+    return destinationTaskId;
   }
 
   public long getRequestId() {
@@ -50,7 +50,7 @@ public class ClientRequestId {
 
   @Override
   public int hashCode() {
-    return (29 * destinationWorkerId) + (int) (57 * requestId);
+    return (29 * destinationTaskId) + (int) (57 * requestId);
   }
 
   @Override
@@ -58,7 +58,7 @@ public class ClientRequestId {
     if (other instanceof ClientRequestId) {
       ClientRequestId otherObj = (ClientRequestId) other;
       if (otherObj.getRequestId() == requestId &&
-          otherObj.getDestinationWorkerId() == destinationWorkerId) {
+          otherObj.getDestinationTaskId() == destinationTaskId) {
         return true;
       }
     }
@@ -68,6 +68,6 @@ public class ClientRequestId {
 
   @Override
   public String toString() {
-    return "(destWorker=" + destinationWorkerId + ",reqId=" + requestId + ")";
+    return "(destTask=" + destinationTaskId + ",reqId=" + requestId + ")";
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java Tue Nov 13 20:03:07 2012
@@ -18,9 +18,10 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.requests.MasterRequest;
 import org.apache.giraph.graph.MasterAggregatorHandler;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.graph.TaskInfo;
 
 /** Handler for requests on master */
 public class MasterRequestServerHandler extends
@@ -33,12 +34,15 @@ public class MasterRequestServerHandler 
    *
    * @param workerRequestReservedMap Worker request reservation map
    * @param conf                     Configuration
+   * @param myTaskInfo               Current task info
    * @param aggregatorHandler        Master aggregator handler
    */
   public MasterRequestServerHandler(
-      WorkerRequestReservedMap workerRequestReservedMap, Configuration conf,
+      WorkerRequestReservedMap workerRequestReservedMap,
+      ImmutableClassesGiraphConfiguration conf,
+      TaskInfo myTaskInfo,
       MasterAggregatorHandler aggregatorHandler) {
-    super(workerRequestReservedMap, conf);
+    super(workerRequestReservedMap, conf, myTaskInfo);
     this.aggregatorHandler = aggregatorHandler;
   }
 
@@ -66,9 +70,10 @@ public class MasterRequestServerHandler 
     @Override
     public RequestServerHandler newHandler(
         WorkerRequestReservedMap workerRequestReservedMap,
-        Configuration conf) {
+        ImmutableClassesGiraphConfiguration conf,
+        TaskInfo myTaskInfo) {
       return new MasterRequestServerHandler(workerRequestReservedMap, conf,
-          aggregatorHandler);
+          myTaskInfo, aggregatorHandler);
     }
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java Tue Nov 13 20:03:07 2012
@@ -19,11 +19,12 @@
 package org.apache.giraph.comm.netty.handler;
 
 import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.utils.SystemTime;
 import org.apache.giraph.utils.Time;
 import org.apache.giraph.utils.Times;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -53,8 +54,8 @@ public abstract class RequestServerHandl
   private final boolean closeFirstRequest;
   /** Request reserved map (for exactly one semantics) */
   private final WorkerRequestReservedMap workerRequestReservedMap;
-  /** My worker id */
-  private final int myWorkerId;
+  /** My task info */
+  private final TaskInfo myTaskInfo;
   /** Start nanoseconds for the processing time */
   private long startProcessingNanoseconds = -1;
 
@@ -63,15 +64,17 @@ public abstract class RequestServerHandl
    *
    * @param workerRequestReservedMap Worker request reservation map
    * @param conf Configuration
+   * @param myTaskInfo Current task info
    */
   public RequestServerHandler(
       WorkerRequestReservedMap workerRequestReservedMap,
-      Configuration conf) {
+      ImmutableClassesGiraphConfiguration conf,
+      TaskInfo myTaskInfo) {
     this.workerRequestReservedMap = workerRequestReservedMap;
     closeFirstRequest = conf.getBoolean(
         GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
         GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
-    myWorkerId = conf.getInt("mapred.task.partition", -1);
+    this.myTaskInfo = myTaskInfo;
   }
 
   @Override
@@ -120,7 +123,7 @@ public abstract class RequestServerHandl
 
     // Send the response with the request id
     ChannelBuffer buffer = ChannelBuffers.directBuffer(RESPONSE_BYTES);
-    buffer.writeInt(myWorkerId);
+    buffer.writeInt(myTaskInfo.getTaskId());
     buffer.writeLong(writableRequest.getRequestId());
     buffer.writeByte(alreadyDone);
     e.getChannel().write(buffer);
@@ -167,10 +170,12 @@ public abstract class RequestServerHandl
      *
      * @param workerRequestReservedMap Worker request reservation map
      * @param conf Configuration to use
+     * @param myTaskInfo Current task info
      * @return New {@link RequestServerHandler}
      */
     RequestServerHandler newHandler(
         WorkerRequestReservedMap workerRequestReservedMap,
-        Configuration conf);
+        ImmutableClassesGiraphConfiguration conf,
+        TaskInfo myTaskInfo);
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java Tue Nov 13 20:03:07 2012
@@ -18,9 +18,10 @@
 
 package org.apache.giraph.comm.netty.handler;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.requests.WorkerRequest;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.graph.TaskInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -44,11 +45,13 @@ public class WorkerRequestServerHandler<
    * @param serverData               Data held by the server
    * @param workerRequestReservedMap Worker request reservation map
    * @param conf                     Configuration
+   * @param myTaskInfo               Current task info
    */
   public WorkerRequestServerHandler(ServerData<I, V, E, M> serverData,
       WorkerRequestReservedMap workerRequestReservedMap,
-      Configuration conf) {
-    super(workerRequestReservedMap, conf);
+      ImmutableClassesGiraphConfiguration conf,
+      TaskInfo myTaskInfo) {
+    super(workerRequestReservedMap, conf, myTaskInfo);
     this.serverData = serverData;
   }
 
@@ -76,9 +79,10 @@ public class WorkerRequestServerHandler<
     @Override
     public RequestServerHandler newHandler(
         WorkerRequestReservedMap workerRequestReservedMap,
-        Configuration conf) {
+        ImmutableClassesGiraphConfiguration conf,
+        TaskInfo myTaskInfo) {
       return new WorkerRequestServerHandler<I, V, E,
-          M>(serverData, workerRequestReservedMap, conf);
+          M>(serverData, workerRequestReservedMap, conf, myTaskInfo);
     }
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java Tue Nov 13 20:03:07 2012
@@ -34,7 +34,7 @@ import java.util.List;
  */
 public class AddressesAndPartitionsWritable implements Writable {
   /** Master information */
-  private WorkerInfo masterInfo;
+  private MasterInfo masterInfo;
   /** List of all workers */
   private List<WorkerInfo> workerInfos;
   /** Collection of partitions */
@@ -49,7 +49,7 @@ public class AddressesAndPartitionsWrita
    * @param workerInfos List of all workers
    * @param partitionOwners Collection of partitions
    */
-  public AddressesAndPartitionsWritable(WorkerInfo masterInfo,
+  public AddressesAndPartitionsWritable(MasterInfo masterInfo,
       List<WorkerInfo> workerInfos,
       Collection<PartitionOwner> partitionOwners) {
     this.masterInfo = masterInfo;
@@ -72,7 +72,7 @@ public class AddressesAndPartitionsWrita
    *
    * @return Master information
    */
-  public WorkerInfo getMasterInfo() {
+  public MasterInfo getMasterInfo() {
     return masterInfo;
   }
 
@@ -111,7 +111,7 @@ public class AddressesAndPartitionsWrita
 
   @Override
   public void readFields(DataInput input) throws IOException {
-    masterInfo = new WorkerInfo();
+    masterInfo = new MasterInfo();
     masterInfo.readFields(input);
 
     int workerInfosSize = input.readInt();

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Tue Nov 13 20:03:07 2012
@@ -276,7 +276,7 @@ public abstract class BspService<I exten
     this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
         context.getConfiguration());
     this.jobId = conf.get("mapred.job.id", "Unknown Job");
-    this.taskPartition = conf.getInt("mapred.task.partition", -1);
+    this.taskPartition = conf.getTaskPartition();
     this.restartedSuperstep = conf.getLong(
         GiraphConfiguration.RESTART_SUPERSTEP,
         UNSET_SUPERSTEP);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Nov 13 20:03:07 2012
@@ -24,8 +24,10 @@ import org.apache.giraph.bsp.Application
 import org.apache.giraph.bsp.BspInputFormat;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClientServer;
-import org.apache.giraph.comm.netty.NettyMasterClientServer;
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.MasterServer;
+import org.apache.giraph.comm.netty.NettyMasterClient;
+import org.apache.giraph.comm.netty.NettyMasterServer;
 import org.apache.giraph.counters.GiraphStats;
 import org.apache.giraph.graph.GraphMapper.MapFunctions;
 import org.apache.giraph.graph.partition.MasterGraphPartitioner;
@@ -140,10 +142,12 @@ public class BspServiceMaster<I extends 
   private MasterAggregatorHandler aggregatorHandler;
   /** Master class */
   private MasterCompute masterCompute;
-  /** Communication service */
-  private MasterClientServer commService;
+  /** IPC Client */
+  private MasterClient masterClient;
+  /** IPC Server */
+  private MasterServer masterServer;
   /** Master info */
-  private WorkerInfo masterInfo;
+  private MasterInfo masterInfo;
   /** List of workers in current superstep */
   private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
   /** Limit locality information added to each InputSplit znode */
@@ -786,10 +790,11 @@ public class BspServiceMaster<I extends 
               getContext());
           aggregatorHandler.initialize(this);
 
-          commService = new NettyMasterClientServer(
-              getContext(), getConfiguration(), this);
-          masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
-              commService.getMyAddress().getPort());
+          masterInfo = new MasterInfo();
+          masterServer = new NettyMasterServer(getConfiguration(), this);
+          masterInfo.setInetSocketAddress(masterServer.getMyAddress());
+          masterClient =
+              new NettyMasterClient(getContext(), getConfiguration(), this);
 
           if (LOG.isInfoEnabled()) {
             LOG.info("becomeMaster: I am now the master!");
@@ -810,6 +815,11 @@ public class BspServiceMaster<I extends 
     }
   }
 
+  @Override
+  public MasterInfo getMasterInfo() {
+    return masterInfo;
+  }
+
   /**
    * Collect and aggregate the worker statistics for a particular superstep.
    *
@@ -1350,7 +1360,7 @@ public class BspServiceMaster<I extends 
       }
     }
 
-    commService.openConnections();
+    masterClient.openConnections();
 
     GiraphStats.getInstance().
         getCurrentWorkers().setValue(chosenWorkerInfoList.size());
@@ -1361,7 +1371,7 @@ public class BspServiceMaster<I extends 
     // We need to finalize aggregators from previous superstep (send them to
     // worker owners) after new worker assignments
     if (getSuperstep() >= 0) {
-      aggregatorHandler.finishSuperstep(commService);
+      aggregatorHandler.finishSuperstep(masterClient);
     }
 
     // Finalize the valid checkpoint file prefixes and possibly
@@ -1406,7 +1416,7 @@ public class BspServiceMaster<I extends 
 
     // Collect aggregator values, then run the master.compute() and
     // finally save the aggregator values
-    aggregatorHandler.prepareSuperstep(commService);
+    aggregatorHandler.prepareSuperstep(masterClient);
     runMasterCompute(getSuperstep());
 
     // If the master is halted or all the vertices voted to halt and there
@@ -1614,8 +1624,8 @@ public class BspServiceMaster<I extends 
       }
       aggregatorHandler.close();
 
-      commService.closeConnections();
-      commService.close();
+      masterClient.closeConnections();
+      masterServer.close();
     }
 
     try {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov 13 20:03:07 2012
@@ -125,7 +125,7 @@ public class BspServiceWorker<I extends 
   private final WorkerAggregatorRequestProcessor
   workerAggregatorRequestProcessor;
   /** Master info */
-  private WorkerInfo masterInfo = new WorkerInfo();
+  private MasterInfo masterInfo = new MasterInfo();
   /** List of workers */
   private List<WorkerInfo> workerInfoList = Lists.newArrayList();
   /** Have the partition exchange children (workers) changed? */
@@ -170,17 +170,17 @@ public class BspServiceWorker<I extends 
     registerBspEvent(partitionExchangeChildrenChanged);
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
-    workerServer = new NettyWorkerServer<I, V, E, M>(getConfiguration(),
-        this, context);
-    workerClient = new NettyWorkerClient<I, V, E, M>(context,
-        getConfiguration(), this);
+    workerInfo = new WorkerInfo(getTaskPartition());
+    workerServer =
+        new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context);
+    workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+    workerClient =
+        new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this);
 
     workerAggregatorRequestProcessor =
         new NettyWorkerAggregatorRequestProcessor(getContext(),
             getConfiguration(), this);
 
-    workerInfo = new WorkerInfo(
-        getHostname(), getTaskPartition(), workerServer.getPort());
     this.workerContext = getConfiguration().createWorkerContext(null);
 
     aggregatorHandler =
@@ -334,7 +334,7 @@ public class BspServiceWorker<I extends 
   }
 
   @Override
-  public WorkerInfo getMasterInfo() {
+  public MasterInfo getMasterInfo() {
     return masterInfo;
   }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java Tue Nov 13 20:03:07 2012
@@ -20,7 +20,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClientServer;
+import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
@@ -145,9 +145,9 @@ public class MasterAggregatorHandler imp
   /**
    * Prepare aggregators for current superstep
    *
-   * @param commService Communication service
+   * @param masterClient IPC client on master
    */
-  public void prepareSuperstep(MasterClientServer commService) {
+  public void prepareSuperstep(MasterClient masterClient) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Start preapring aggregators");
     }
@@ -169,9 +169,9 @@ public class MasterAggregatorHandler imp
   /**
    * Finalize aggregators for current superstep and share them with workers
    *
-   * @param commService Communication service
+   * @param masterClient IPC client on master
    */
-  public void finishSuperstep(MasterClientServer commService) {
+  public void finishSuperstep(MasterClient masterClient) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("finishSuperstep: Start finishing aggregators");
     }
@@ -192,12 +192,12 @@ public class MasterAggregatorHandler imp
     try {
       for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
           aggregatorMap.entrySet()) {
-        commService.sendAggregator(entry.getKey(),
+        masterClient.sendAggregator(entry.getKey(),
             entry.getValue().getAggregatorClass(),
             entry.getValue().getPreviousAggregatedValue());
         progressable.progress();
       }
-      commService.finishSendingAggregatedValues();
+      masterClient.finishSendingAggregatedValues();
     } catch (IOException e) {
       throw new IllegalStateException("finishSuperstep: " +
           "IOException occurred while sending aggregators", e);

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java?rev=1408926&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java Tue Nov 13 20:03:07 2012
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * Information about the master that is sent to other workers.
+ */
+public class MasterInfo extends TaskInfo {
+  /**
+   * Constructor
+   */
+  public MasterInfo() {
+  }
+
+  @Override
+  public int getTaskId() {
+    return -1;
+  }
+
+  @Override
+  public String toString() {
+    return "Master(hostname=" + getHostname() + ", port=" + getPort() + ")";
+  }
+}

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java?rev=1408926&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java Tue Nov 13 20:03:07 2012
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Abstract class for information about any task - worker or master.
+ */
+public abstract class TaskInfo implements Writable {
+  /** Task hostname */
+  private String hostname;
+  /** Port that the IPC server is using */
+  private int port;
+
+  /**
+   * Constructor
+   */
+  public TaskInfo() {
+  }
+
+  /**
+   * Get this task's hostname
+   *
+   * @return Hostname
+   */
+  public String getHostname() {
+    return hostname;
+  }
+
+  /**
+   * Get port that the IPC server of this task is using
+   *
+   * @return Port
+   */
+  public int getPort() {
+    return port;
+  }
+
+  /**
+   * Set address that the IPC server of this task is using
+   *
+   * @param address Address
+   */
+  public void setInetSocketAddress(InetSocketAddress address) {
+    this.port = address.getPort();
+    this.hostname = address.getHostName();
+  }
+
+  /**
+   * Get a new instance of the InetSocketAddress for this hostname and port
+   *
+   * @return InetSocketAddress of the hostname and port.
+   */
+  public InetSocketAddress getInetSocketAddress() {
+    return new InetSocketAddress(hostname, port);
+  }
+
+  /**
+   * Get task partition id of this task
+   *
+   * @return Task partition id of this task
+   */
+  public abstract int getTaskId();
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof TaskInfo) {
+      TaskInfo taskInfo = (TaskInfo) other;
+      if (hostname.equals(taskInfo.getHostname()) &&
+          (getTaskId() == taskInfo.getTaskId()) &&
+          (port == taskInfo.getPort())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    hostname = input.readUTF();
+    port = input.readInt();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeUTF(hostname);
+    output.writeInt(port);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 37 * result + getPort();
+    result = 37 * result + hostname.hashCode();
+    result = 37 * result + getTaskId();
+    return result;
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java Tue Nov 13 20:03:07 2012
@@ -23,18 +23,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.apache.hadoop.io.Writable;
-
 /**
  * Information about a worker that is sent to the master and other workers.
  */
-public class WorkerInfo implements Writable {
-  /** Worker hostname */
-  private String hostname;
-  /** Task Partition (Worker) ID of this worker */
-  private int taskId = -1;
-  /** Port that the IPC server is using */
-  private int port = -1;
+public class WorkerInfo extends TaskInfo {
+  /** Task Partition (Worker) ID of this task */
+  private int taskId;
   /** Hostname + "_" + id for easier debugging */
   private String hostnameId;
 
@@ -47,92 +41,43 @@ public class WorkerInfo implements Writa
   /**
    * Constructor with parameters.
    *
-   * @param hostname Hostname of this worker.
    * @param taskId the task partition for this worker
-   * @param port Port of the service.
    */
-  public WorkerInfo(String hostname, int taskId, int port) {
-    this.hostname = hostname;
+  public WorkerInfo(int taskId) {
     this.taskId = taskId;
-    this.port = port;
-    this.hostnameId = hostname + "_" + taskId;
-  }
-
-  /**
-   * Constructor with InetSocketAddress
-   *
-   * @param address Address of this worker
-   * @param taskId The task partition for this worker
-   */
-  public WorkerInfo(InetSocketAddress address, int taskId) {
-    this(address.getHostName(), taskId, address.getPort());
-  }
-
-  public String getHostname() {
-    return hostname;
   }
 
+  @Override
   public int getTaskId() {
     return taskId;
   }
 
-  public String getHostnameId() {
-    return hostnameId;
-  }
-
-  /**
-   * Get a new instance of the InetSocketAddress for this hostname and port
-   *
-   * @return InetSocketAddress of the hostname and port.
-   */
-  public InetSocketAddress getInetSocketAddress() {
-    return new InetSocketAddress(hostname, port);
-  }
-
-  public int getPort() {
-    return port;
-  }
-
   @Override
-  public boolean equals(Object other) {
-    if (other instanceof WorkerInfo) {
-      WorkerInfo workerInfo = (WorkerInfo) other;
-      if (hostname.equals(workerInfo.getHostname()) &&
-          (taskId == workerInfo.getTaskId()) &&
-          (port == workerInfo.getPort())) {
-        return true;
-      }
-    }
-    return false;
+  public void setInetSocketAddress(InetSocketAddress address) {
+    super.setInetSocketAddress(address);
+    hostnameId = getHostname() + "_" + getTaskId();
   }
 
-  @Override
-  public int hashCode() {
-    int result = 17;
-    result = 37 * result + port;
-    result = 37 * result + hostname.hashCode();
-    result = 37 * result + taskId;
-    return result;
+  public String getHostnameId() {
+    return hostnameId;
   }
 
   @Override
   public String toString() {
-    return "Worker(hostname=" + hostname + ", MRtaskID=" +
-        taskId + ", port=" + port + ")";
+    return "Worker(hostname=" + getHostname() + ", MRtaskID=" +
+        getTaskId() + ", port=" + getPort() + ")";
   }
 
   @Override
   public void readFields(DataInput input) throws IOException {
-    hostname = input.readUTF();
+    super.readFields(input);
     taskId = input.readInt();
-    port = input.readInt();
-    hostnameId = hostname + "_" + taskId;
+    hostnameId = getHostname() + "_" + getTaskId();
   }
 
   @Override
   public void write(DataOutput output) throws IOException {
-    output.writeUTF(hostname);
+    super.write(output);
     output.writeInt(taskId);
-    output.writeInt(port);
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java Tue Nov 13 20:03:07 2012
@@ -138,7 +138,7 @@ public class ZooKeeperManager {
     throws IOException {
     this.context = context;
     this.conf = configuration;
-    taskPartition = conf.getInt("mapred.task.partition", -1);
+    taskPartition = conf.getTaskPartition();
     jobId = conf.get("mapred.job.id", "Unknown Job");
     baseDirectory =
         new Path(conf.get(GiraphConfiguration.ZOOKEEPER_MANAGER_DIRECTORY,

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue Nov 13 20:03:07 2012
@@ -74,15 +74,16 @@ public class ConnectionTest {
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         MockUtils.createNewServerData(conf, context);
+    WorkerInfo workerInfo = new WorkerInfo(-1);
     NettyServer server =
         new NettyServer(conf,
-            new WorkerRequestServerHandler.Factory(serverData));
+            new WorkerRequestServerHandler.Factory(serverData), workerInfo);
     server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
 
-    NettyClient client = new NettyClient(context, conf);
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
     client.connectAllAddresses(
-        Lists.<WorkerInfo>newArrayList(
-            new WorkerInfo(server.getMyAddress(), -1)));
+        Lists.<WorkerInfo>newArrayList(workerInfo));
 
     client.stop();
     server.stop();
@@ -104,17 +105,25 @@ public class ConnectionTest {
    RequestServerHandler.Factory requestServerHandlerFactory =
        new WorkerRequestServerHandler.Factory(serverData);
 
-    NettyServer server1 = new NettyServer(conf, requestServerHandlerFactory);
+    WorkerInfo workerInfo1 = new WorkerInfo(1);
+    NettyServer server1 =
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo1);
     server1.start();
-    NettyServer server2 = new NettyServer(conf, requestServerHandlerFactory);
+    workerInfo1.setInetSocketAddress(server1.getMyAddress());
+
+    WorkerInfo workerInfo2 = new WorkerInfo(2);
+    NettyServer server2 =
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo2);
     server2.start();
-    NettyServer server3 = new NettyServer(conf, requestServerHandlerFactory);
+    workerInfo2.setInetSocketAddress(server2.getMyAddress());
+
+    WorkerInfo workerInfo3 = new WorkerInfo(3);
+    NettyServer server3 =
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo3);
     server3.start();
+    workerInfo3.setInetSocketAddress(server3.getMyAddress());
 
-    NettyClient client = new NettyClient(context, conf);
-    WorkerInfo workerInfo1 = new WorkerInfo(server1.getMyAddress(), 1);
-    WorkerInfo workerInfo2 = new WorkerInfo(server2.getMyAddress(), 2);
-    WorkerInfo workerInfo3 = new WorkerInfo(server3.getMyAddress(), 3);
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
     List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
         workerInfo2, workerInfo3);
     client.connectAllAddresses(addresses);
@@ -138,18 +147,18 @@ public class ConnectionTest {
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         MockUtils.createNewServerData(conf, context);
+    WorkerInfo workerInfo = new WorkerInfo(-1);
     NettyServer server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData));
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo);
     server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
 
-    List<WorkerInfo> addresses =
-        Lists.<WorkerInfo>newArrayList(
-            new WorkerInfo(server.getMyAddress(), -1));
-    NettyClient client1 = new NettyClient(context, conf);
+    List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
+    NettyClient client1 = new NettyClient(context, conf, new WorkerInfo());
     client1.connectAllAddresses(addresses);
-    NettyClient client2 = new NettyClient(context, conf);
+    NettyClient client2 = new NettyClient(context, conf, new WorkerInfo());
     client2.connectAllAddresses(addresses);
-    NettyClient client3 = new NettyClient(context, conf);
+    NettyClient client3 = new NettyClient(context, conf, new WorkerInfo());
     client3.connectAllAddresses(addresses);
 
     client1.stop();

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Tue Nov 13 20:03:07 2012
@@ -128,29 +128,7 @@ public class RequestFailureTest {
 
   @Test
   public void send2Requests() throws IOException {
-    // Start the service
-    serverData = MockUtils.createNewServerData(conf, context);
-    server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData));
-    server.start();
-    client = new NettyClient(context, conf);
-    client.connectAllAddresses(
-        Lists.<WorkerInfo>newArrayList(
-            new WorkerInfo(server.getMyAddress(), -1)));
-
-    // Send the request 2x
-    WritableRequest request1 = getRequest();
-    WritableRequest request2 = getRequest();
-    client.sendWritableRequest(-1, request1);
-    client.sendWritableRequest(-1, request2);
-    client.waitAllRequests();
-
-    // Stop the service
-    client.stop();
-    server.stop();
-
-    // Check the output (should have been only processed once)
-    checkResult(2);
+    checkSendingTwoRequests();
   }
 
   @Test
@@ -162,29 +140,7 @@ public class RequestFailureTest {
     // Loop every 2 seconds
     conf.setInt(GiraphConfiguration.WAITING_REQUEST_MSECS, 2000);
 
-    // Start the service
-    serverData = MockUtils.createNewServerData(conf, context);
-    server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData));
-    server.start();
-    client = new NettyClient(context, conf);
-    client.connectAllAddresses(
-        Lists.<WorkerInfo>newArrayList(
-            new WorkerInfo(server.getMyAddress(), -1)));
-
-    // Send the request 2x, but should only be processed once
-    WritableRequest request1 = getRequest();
-    WritableRequest request2 = getRequest();
-    client.sendWritableRequest(-1, request1);
-    client.sendWritableRequest(-1, request2);
-    client.waitAllRequests();
-
-    // Stop the service
-    client.stop();
-    server.stop();
-
-    // Check the output (should have been only processed once)
-    checkResult(2);
+    checkSendingTwoRequests();
   }
 
   @Test
@@ -196,21 +152,26 @@ public class RequestFailureTest {
     // Loop every 2 seconds
     conf.setInt(GiraphConfiguration.WAITING_REQUEST_MSECS, 2000);
 
+    checkSendingTwoRequests();
+  }
+
+  private void checkSendingTwoRequests() throws IOException {
     // Start the service
     serverData = MockUtils.createNewServerData(conf, context);
+    WorkerInfo workerInfo = new WorkerInfo(-1);
     server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData));
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo);
     server.start();
-    client = new NettyClient(context, conf);
+    workerInfo.setInetSocketAddress(server.getMyAddress());
+    client = new NettyClient(context, conf, new WorkerInfo());
     client.connectAllAddresses(
-        Lists.<WorkerInfo>newArrayList(
-            new WorkerInfo(server.getMyAddress(), -1)));
+        Lists.<WorkerInfo>newArrayList(workerInfo));
 
     // Send the request 2x, but should only be processed once
     WritableRequest request1 = getRequest();
     WritableRequest request2 = getRequest();
-    client.sendWritableRequest(-1, request1);
-    client.sendWritableRequest(-1, request2);
+    client.sendWritableRequest(workerInfo.getTaskId(), request1);
+    client.sendWritableRequest(workerInfo.getTaskId(), request2);
     client.waitAllRequests();
 
     // Stop the service

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Nov 13 20:03:07 2012
@@ -67,6 +67,8 @@ public class RequestTest {
   private NettyServer server;
   /** Client */
   private NettyClient client;
+  /** Worker info */
+  private WorkerInfo workerInfo;
 
   /**
    * Only for testing.
@@ -92,13 +94,14 @@ public class RequestTest {
 
     // Start the service
     serverData = MockUtils.createNewServerData(conf, context);
+    workerInfo = new WorkerInfo(-1);
     server = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory(serverData));
+        new WorkerRequestServerHandler.Factory(serverData), workerInfo);
     server.start();
-    client = new NettyClient(context, conf);
+    workerInfo.setInetSocketAddress(server.getMyAddress());
+    client = new NettyClient(context, conf, new WorkerInfo());
     client.connectAllAddresses(
-        Lists.<WorkerInfo>newArrayList(
-            new WorkerInfo(server.getMyAddress(), -1)));
+        Lists.<WorkerInfo>newArrayList(workerInfo));
   }
 
   @Test
@@ -120,7 +123,7 @@ public class RequestTest {
     IntWritable> request =
       new SendVertexRequest<IntWritable, IntWritable,
       IntWritable, IntWritable>(partitionId, vertices);
-    client.sendWritableRequest(-1, request);
+    client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
     // Stop the service
@@ -165,7 +168,7 @@ public class RequestTest {
         IntWritable> request =
       new SendWorkerMessagesRequest<IntWritable, IntWritable,
             IntWritable, IntWritable>(dataToSend);
-    client.sendWritableRequest(-1, request);
+    client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
     // Stop the service
@@ -228,7 +231,7 @@ public class RequestTest {
     IntWritable> request =
       new SendPartitionMutationsRequest<IntWritable, IntWritable,
       IntWritable, IntWritable>(partitionId, vertexIdMutations);
-    client.sendWritableRequest(-1, request);
+    client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
     // Stop the service

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Tue Nov 13 20:03:07 2012
@@ -83,16 +83,17 @@ public class SaslConnectionTest {
     when(mockedSaslServerFactory.newHandler(conf)).
         thenReturn(mockedSaslServerHandler);
 
+    WorkerInfo workerInfo = new WorkerInfo(-1);
     NettyServer server =
         new NettyServer(conf,
             new WorkerRequestServerHandler.Factory(serverData),
+            workerInfo,
             mockedSaslServerFactory);
     server.start();
+    workerInfo.setInetSocketAddress(server.getMyAddress());
 
-    NettyClient client = new NettyClient(context, conf);
-    client.connectAllAddresses(
-        Lists.<WorkerInfo>newArrayList(
-            new WorkerInfo(server.getMyAddress(), -1)));
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+    client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo));
 
     client.stop();
     server.stop();