You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/18 00:05:48 UTC

svn commit: r1399453 - in /giraph/trunk: ./ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/src/main/java/org/apache/giraph/comm/netty/ giraph/src/main/java/org/apache/giraph/graph/ giraph/src/main/java/o...

Author: maja
Date: Wed Oct 17 22:05:47 2012
New Revision: 1399453

URL: http://svn.apache.org/viewvc?rev=1399453&view=rev
Log:
GIRAPH-372: Write worker addresses to Zookeeper; move addresses and resolution to NettyClient

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct 17 22:05:47 2012
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-372: Write worker addresses to Zookeeper; 
+  move addresses and resolution to NettyClient (majakabiljo)
+
   GIRAPH-373: RandomMessageBenchmark is broken (majakabiljo).
 
   GIRAPH-374: Multithreading in input split loading and compute (aching).

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java Wed Oct 17 22:05:47 2012
@@ -18,10 +18,12 @@
 
 package org.apache.giraph.bsp;
 
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Basic service interface shared by both {@link CentralizedServiceMaster} and
@@ -61,6 +63,13 @@ public interface CentralizedService<I ex
   boolean checkpointFrequencyMet(long superstep);
 
   /**
+   * Get list of workers
+   *
+   * @return List of workers
+   */
+  List<WorkerInfo> getWorkerInfoList();
+
+  /**
    * Clean up the service (no calls may be issued after this)
    *
    * @throws IOException

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Wed Oct 17 22:05:47 2012
@@ -93,13 +93,6 @@ public interface CentralizedServiceWorke
   PartitionStore<I, V, E, M> getPartitionStore();
 
   /**
-   * Get a collection of all the partition owners.
-   *
-   * @return Collection of all the partition owners.
-   */
-  Collection<? extends PartitionOwner> getPartitionOwners();
-
-  /**
    *  Both the vertices and the messages need to be checkpointed in order
    *  for them to be used.  This is done after all messages have been
    *  delivered, but prior to a superstep starting.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java Wed Oct 17 22:05:47 2012
@@ -18,18 +18,14 @@
 
 package org.apache.giraph.comm;
 
-import org.apache.giraph.graph.WorkerInfo;
-
 /**
  * Interface for master to send messages to workers
  */
 public interface MasterClient {
   /**
-   * Fix workers for current superstep
-   *
-   * @param workers Information about workers
+   * Make sure that all the connections to workers have been established.
    */
-  void fixWorkerAddresses(Iterable<WorkerInfo> workers);
+  void openConnections();
 
   /**
    * Flush all outgoing messages.  This will synchronously ensure that all

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed Oct 17 22:05:47 2012
@@ -18,11 +18,8 @@
 
 package org.apache.giraph.comm;
 
-import java.net.InetSocketAddress;
-import java.util.Collection;
 import org.apache.giraph.comm.requests.WritableRequest;
 
-import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -66,34 +63,18 @@ else[HADOOP_NON_SECURE]*/
   PartitionOwner getVertexPartitionOwner(I vertexId);
 
   /**
-   * Make sure that all the connections to the partitions owners have been
+   * Make sure that all the connections to workers and master have been
    * established.
-   *
-   * @param partitionOwners Partition owners to establish/check connections
-   */
-  void openConnections(
-      Collection<? extends PartitionOwner> partitionOwners);
-
-  /**
-   * Fill the socket address cache for the worker info and its partition.
-   *
-   * @param workerInfo Worker information to get the socket address
-   * @param partitionId Partition id to look up.
-   * @return address of the vertex range server containing this vertex
    */
-  InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
-                                         int partitionId);
+  void openConnections();
 
   /**
    * Send a request to a remote server (should be already connected)
    *
    * @param destWorkerId Destination worker id
-   * @param remoteServer Server to send the request to
    * @param request Request to send
    */
-  void sendWritableRequest(Integer destWorkerId,
-                                  InetSocketAddress remoteServer,
-                                  WritableRequest request);
+  void sendWritableRequest(Integer destWorkerId, WritableRequest request);
 
   /**
    * Wait until all the outstanding requests are completed.

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Oct 17 22:05:47 2012
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -47,6 +48,7 @@ import org.apache.giraph.comm.requests.R
 import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
 /*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
@@ -102,6 +104,11 @@ else[HADOOP_NON_SECURE]*/
   private final ConcurrentMap<InetSocketAddress, ChannelRotater>
   addressChannelMap = new MapMaker().makeMap();
   /**
+   * Map from task id to address of its server
+   */
+  private final Map<Integer, InetSocketAddress> taskIdAddressMap =
+      new MapMaker().makeMap();
+  /**
    * Request map of client request ids to request information.
    */
   private final ConcurrentMap<ClientRequestId, RequestInfo>
@@ -122,8 +129,6 @@ else[HADOOP_NON_SECURE]*/
   private final int maxConnectionFailures;
   /** Maximum number of milliseconds for a request */
   private final int maxRequestMilliseconds;
-  /** Maximum number of reconnection failures */
-  private final int maxReconnectionFailures;
   /** Waiting internal for checking outstanding requests msecs */
   private final int waitingRequestMsecs;
   /** Timed logger for printing request debugging */
@@ -139,6 +144,8 @@ else[HADOOP_NON_SECURE]*/
   private final int clientId;
   /** Maximum thread pool size */
   private final int maxPoolSize;
+  /** Maximum number of attempts to resolve an address*/
+  private final int maxResolveAddressAttempts;
   /** Execution handler (if used) */
   private final ExecutionHandler executionHandler;
   /** Name of the handler before the execution handler (if used) */
@@ -186,10 +193,6 @@ else[HADOOP_NON_SECURE]*/
         GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES,
         GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
 
-    maxReconnectionFailures = conf.getInt(
-        GiraphConfiguration.MAX_RECONNECT_ATTEMPTS,
-        GiraphConfiguration.MAX_RECONNECT_ATTEMPTS_DEFAULT);
-
     waitingRequestMsecs = conf.getInt(
         GiraphConfiguration.WAITING_REQUEST_MSECS,
         GiraphConfiguration.WAITING_REQUEST_MSECS_DEFAULT);
@@ -198,6 +201,10 @@ else[HADOOP_NON_SECURE]*/
         GiraphConfiguration.NETTY_CLIENT_THREADS,
         GiraphConfiguration.NETTY_CLIENT_THREADS_DEFAULT);
 
+    maxResolveAddressAttempts = conf.getInt(
+        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
+        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
+
     clientRequestIdRequestInfoMap =
         new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
 
@@ -342,20 +349,27 @@ else[HADOOP_NON_SECURE]*/
   }
 
   /**
-   * Connect to a collection of addresses
+   * Connect to a collection of tasks servers
    *
-   * @param addresses Addresses to connect to (if haven't already connected)
+   * @param tasks Tasks to connect to (if haven't already connected)
    */
-  public void connectAllAddresses(Map<InetSocketAddress, Integer> addresses) {
+  public void connectAllAddresses(Collection<WorkerInfo> tasks) {
     List<ChannelFutureAddress> waitingConnectionList =
-        Lists.newArrayListWithCapacity(addresses.size() * channelsPerServer);
-    for (Map.Entry<InetSocketAddress, Integer> entry : addresses.entrySet()) {
+        Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
+    for (WorkerInfo taskInfo : tasks) {
       context.progress();
-      InetSocketAddress address = entry.getKey();
+      InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());
+      if (address == null ||
+          !address.getHostName().equals(taskInfo.getHostname()) ||
+          address.getPort() != taskInfo.getPort()) {
+        address = resolveAddress(maxResolveAddressAttempts,
+            taskInfo.getInetSocketAddress());
+        taskIdAddressMap.put(taskInfo.getTaskId(), address);
+      }
       if (address == null || address.getHostName() == null ||
           address.getHostName().isEmpty()) {
         throw new IllegalStateException("connectAllAddresses: Null address " +
-            "in addresses " + addresses);
+            "in addresses " + tasks);
       }
       if (address.isUnresolved()) {
         throw new IllegalStateException("connectAllAddresses: Unresolved " +
@@ -372,7 +386,7 @@ else[HADOOP_NON_SECURE]*/
 
         waitingConnectionList.add(
             new ChannelFutureAddress(
-                connectionFuture, address, entry.getValue()));
+                connectionFuture, address, taskInfo.getTaskId()));
       }
     }
 
@@ -490,9 +504,7 @@ else[HADOOP_NON_SECURE]*/
               "to complete..");
         }
         SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
-        sendWritableRequest(taskId, (InetSocketAddress) channel
-            .getRemoteAddress(),
-          saslTokenMessage);
+        sendWritableRequest(taskId, saslTokenMessage);
         // We now wait for Netty's thread pool to communicate over this
         // channel to authenticate with another worker acting as a server.
         try {
@@ -611,12 +623,11 @@ else[HADOOP_NON_SECURE]*/
    * Send a request to a remote server (should be already connected)
    *
    * @param destWorkerId Destination worker id
-   * @param remoteServer Server to send the request to
    * @param request Request to send
    */
   public void sendWritableRequest(Integer destWorkerId,
-                                  InetSocketAddress remoteServer,
-                                  WritableRequest request) {
+      WritableRequest request) {
+    InetSocketAddress remoteServer = taskIdAddressMap.get(destWorkerId);
     if (clientRequestIdRequestInfoMap.isEmpty()) {
       byteCounter.resetAll();
     }
@@ -756,4 +767,38 @@ else[HADOOP_NON_SECURE]*/
       addedRequestInfos.clear();
     }
   }
+
+  /**
+   * Utility method for resolving addresses
+   *
+   * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
+   *        address
+   * @param address The address we are attempting to resolve
+   * @return The successfully resolved address.
+   * @throws IllegalStateException if the address is not resolved
+   *         in <code>maxResolveAddressAttempts</code> tries.
+   */
+  private static InetSocketAddress resolveAddress(
+      int maxResolveAddressAttempts, InetSocketAddress address) {
+    int resolveAttempts = 0;
+    while (address.isUnresolved() &&
+        resolveAttempts < maxResolveAddressAttempts) {
+      ++resolveAttempts;
+      LOG.warn("resolveAddress: Failed to resolve " + address +
+          " on attempt " + resolveAttempts + " of " +
+          maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        LOG.warn("resolveAddress: Interrupted.", e);
+      }
+      address = new InetSocketAddress(address.getHostName(),
+          address.getPort());
+    }
+    if (resolveAttempts >= maxResolveAddressAttempts) {
+      throw new IllegalStateException("resolveAddress: Couldn't " +
+          "resolve " + address + " in " +  resolveAttempts + " tries.");
+    }
+    return address;
+  }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java Wed Oct 17 22:05:47 2012
@@ -18,19 +18,11 @@
 
 package org.apache.giraph.comm.netty;
 
-import com.google.common.collect.Maps;
-import java.util.Map;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-
 /**
  * Netty implementation of {@link MasterClient}
  */
@@ -38,30 +30,25 @@ public class NettyMasterClient implement
   /** Netty client that does the actual I/O */
   private final NettyClient nettyClient;
   /** Worker information for current superstep */
-  private Collection<WorkerInfo> workers;
+  private CentralizedServiceMaster<?, ?, ?, ?> service;
 
   /**
    * Constructor
    *
    * @param context Context from mapper
    * @param configuration Configuration
+   * @param service Centralized service
    */
   public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
-                           ImmutableClassesGiraphConfiguration configuration) {
+                           ImmutableClassesGiraphConfiguration configuration,
+                           CentralizedServiceMaster<?, ?, ?, ?> service) {
     this.nettyClient = new NettyClient(context, configuration);
-    workers = Lists.newArrayList();
+    this.service = service;
   }
 
   @Override
-  public void fixWorkerAddresses(Iterable<WorkerInfo> workers) {
-    this.workers.clear();
-    Iterables.addAll(this.workers, workers);
-    Map<InetSocketAddress, Integer> addresses =
-        Maps.newHashMapWithExpectedSize(this.workers.size());
-    for (WorkerInfo worker : workers) {
-      addresses.put(worker.getInetSocketAddress(), worker.getTaskId());
-    }
-    nettyClient.connectAllAddresses(addresses);
+  public void openConnections() {
+    nettyClient.connectAllAddresses(service.getWorkerInfoList());
   }
 
   @Override

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java Wed Oct 17 22:05:47 2012
@@ -19,10 +19,10 @@
 package org.apache.giraph.comm.netty;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.MasterClientServer;
 import org.apache.giraph.comm.MasterServer;
-import org.apache.giraph.graph.WorkerInfo;
 import org.apache.hadoop.mapreduce.Mapper;
 
 import java.net.InetSocketAddress;
@@ -41,17 +41,19 @@ public class NettyMasterClientServer imp
    *
    * @param context Mapper context
    * @param configuration Configuration
+   * @param service Centralized service
    */
   public NettyMasterClientServer(
       Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration configuration) {
-    client = new NettyMasterClient(context, configuration);
+      ImmutableClassesGiraphConfiguration configuration,
+      CentralizedServiceMaster<?, ?, ?, ?> service) {
+    client = new NettyMasterClient(context, configuration, service);
     server = new NettyMasterServer(configuration);
   }
 
   @Override
-  public void fixWorkerAddresses(Iterable<WorkerInfo> workers) {
-    client.fixWorkerAddresses(workers);
+  public void openConnections() {
+    client.openConnections();
   }
 
   @Override

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Wed Oct 17 22:05:47 2012
@@ -18,8 +18,6 @@
 
 package org.apache.giraph.comm.netty;
 
-import java.util.concurrent.ConcurrentMap;
-import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClient;
@@ -31,13 +29,10 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
 
 /**
  * Takes users facing APIs in {@link WorkerClient} and implements them
@@ -52,8 +47,6 @@ import java.util.concurrent.ConcurrentHa
 public class NettyWorkerClient<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
     WorkerClient<I, V, E, M> {
-  /** Signal for getInetSocketAddress() to use WorkerInfo's address */
-  public static final int NO_PARTITION_ID = Integer.MIN_VALUE;
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
   /** Hadoop configuration */
@@ -62,14 +55,6 @@ public class NettyWorkerClient<I extends
   private final NettyClient nettyClient;
   /** Centralized service, needed to get vertex ranges */
   private final CentralizedServiceWorker<I, V, E, M> service;
-  /**
-   * Cached map of partition ids to remote socket address.
-   */
-  private final ConcurrentMap<Integer, InetSocketAddress>
-  partitionIndexAddressMap =
-      new ConcurrentHashMap<Integer, InetSocketAddress>();
-  /** Maximum number of attempts to resolve an address*/
-  private final int maxResolveAddressAttempts;
 
   /**
    * Only constructor.
@@ -85,10 +70,6 @@ public class NettyWorkerClient<I extends
     this.nettyClient = new NettyClient(context, configuration);
     this.conf = configuration;
     this.service = service;
-
-    maxResolveAddressAttempts = conf.getInt(
-        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
-        GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
   }
 
   public CentralizedServiceWorker<I, V, E, M> getService() {
@@ -96,94 +77,17 @@ public class NettyWorkerClient<I extends
   }
 
   @Override
-  public void openConnections(
-      Collection<? extends PartitionOwner> partitionOwners) {
-    // 1. Fix all the cached inet addresses (remove all changed entries)
-    // 2. Connect to any new IPC servers
-    Map<InetSocketAddress, Integer> addressTaskIdMap =
-        Maps.newHashMapWithExpectedSize(partitionOwners.size());
-    for (PartitionOwner partitionOwner : partitionOwners) {
-      InetSocketAddress address =
-          partitionIndexAddressMap.get(
-              partitionOwner.getPartitionId());
-      if (address != null &&
-          (!address.getHostName().equals(
-              partitionOwner.getWorkerInfo().getHostname()) ||
-              address.getPort() !=
-              partitionOwner.getWorkerInfo().getPort())) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("fixPartitionIdToSocketAddrMap: " +
-              "Partition owner " +
-              partitionOwner + " changed from " +
-              address);
-        }
-        partitionIndexAddressMap.remove(
-            partitionOwner.getPartitionId());
-      }
-
+  public void openConnections() {
+    List<WorkerInfo> addresses = Lists.newArrayListWithCapacity(
+        service.getWorkerInfoList().size());
+    for (WorkerInfo info : service.getWorkerInfoList()) {
       // No need to connect to myself
-      if (service.getWorkerInfo().getTaskId() !=
-          partitionOwner.getWorkerInfo().getTaskId()) {
-        addressTaskIdMap.put(
-            getInetSocketAddress(partitionOwner.getWorkerInfo(),
-                partitionOwner.getPartitionId()),
-            partitionOwner.getWorkerInfo().getTaskId());
+      if (service.getWorkerInfo().getTaskId() != info.getTaskId()) {
+        addresses.add(info);
       }
     }
-
-    addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
-        null);
-    nettyClient.connectAllAddresses(addressTaskIdMap);
-  }
-
-  @Override
-  public InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
-      int partitionId) {
-    InetSocketAddress address = partitionIndexAddressMap.get(partitionId);
-    if (address == null) {
-      address = resolveAddress(maxResolveAddressAttempts,
-          workerInfo.getInetSocketAddress());
-      if (partitionId != NO_PARTITION_ID) {
-        // Only cache valid partition ids
-        partitionIndexAddressMap.put(partitionId, address);
-      }
-    }
-
-    return address;
-  }
-
-  /**
-   * Utility method for getInetSocketAddress()
-   *
-   * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
-   *        address
-   * @param address the address we are attempting to resolve
-   * @return the successfully resolved address.
-   * @throws IllegalStateException if the address is not resolved
-   *         in <code>maxResolveAddressAttempts</code> tries.
-   */
-  private static InetSocketAddress resolveAddress(
-      int maxResolveAddressAttempts, InetSocketAddress address) {
-    int resolveAttempts = 0;
-    while (address.isUnresolved() &&
-      resolveAttempts < maxResolveAddressAttempts) {
-      ++resolveAttempts;
-      LOG.warn("resolveAddress: Failed to resolve " + address +
-        " on attempt " + resolveAttempts + " of " +
-        maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        LOG.warn("resolveAddress: Interrupted.", e);
-      }
-      address = new InetSocketAddress(address.getHostName(),
-          address.getPort());
-    }
-    if (resolveAttempts >= maxResolveAddressAttempts) {
-      throw new IllegalStateException("resolveAddress: Couldn't " +
-        "resolve " + address + " in " +  resolveAttempts + " tries.");
-    }
-    return address;
+    addresses.add(service.getMasterInfo());
+    nettyClient.connectAllAddresses(addresses);
   }
 
   @Override
@@ -193,9 +97,8 @@ public class NettyWorkerClient<I extends
 
   @Override
   public void sendWritableRequest(Integer destWorkerId,
-                                  InetSocketAddress remoteServer,
                                   WritableRequest request) {
-    nettyClient.sendWritableRequest(destWorkerId, remoteServer, request);
+    nettyClient.sendWritableRequest(destWorkerId, request);
   }
 
   @Override
@@ -216,7 +119,7 @@ public class NettyWorkerClient<I extends
 else[HADOOP_NON_SECURE]*/
   @Override
   public void setup(boolean authenticate) {
-    openConnections(service.getPartitionOwners());
+    openConnections();
     if (authenticate) {
       authenticate();
     }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Wed Oct 17 22:05:47 2012
@@ -19,9 +19,7 @@ package org.apache.giraph.comm.netty;
 
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
@@ -137,11 +135,9 @@ public class NettyWorkerClientRequestPro
     if (workerMessageCount >= maxMessagesPerWorker) {
       Map<Integer, Map<I, Collection<M>>> workerMessages =
           sendMessageCache.removeWorkerMessages(workerInfo);
-      InetSocketAddress remoteWorkerAddress =
-          workerClient.getInetSocketAddress(workerInfo, partitionId);
       WritableRequest writableRequest =
           new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
-      doRequest(workerInfo, remoteWorkerAddress, writableRequest);
+      doRequest(workerInfo, writableRequest);
     }
   }
 
@@ -149,18 +145,15 @@ public class NettyWorkerClientRequestPro
   public void sendPartitionRequest(WorkerInfo workerInfo,
                                    Partition<I, V, E, M> partition) {
     final int partitionId = partition.getId();
-    InetSocketAddress remoteServerAddress =
-        workerClient.getInetSocketAddress(workerInfo, partitionId);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("sendVertexRequest: Sending to " +
-          remoteServerAddress +
-          " from " + workerInfo + ", with partition " + partition);
+      LOG.trace("sendVertexRequest: Sending to " + workerInfo +
+          ", with partition " + partition);
     }
 
     WritableRequest vertexRequest =
         new SendVertexRequest<I, V, E, M>(partitionId,
             partition.getVertices());
-    doRequest(workerInfo, remoteServerAddress, vertexRequest);
+    doRequest(workerInfo, vertexRequest);
 
     // Messages are stored separately
     MessageStoreByPartition<I, M> messageStore =
@@ -180,7 +173,7 @@ public class NettyWorkerClientRequestPro
       if (messagesInMap > maxMessagesPerWorker) {
         WritableRequest messagesRequest = new
             SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-        doRequest(workerInfo, remoteServerAddress, messagesRequest);
+        doRequest(workerInfo, messagesRequest);
         map.clear();
         messagesInMap = 0;
       }
@@ -188,7 +181,7 @@ public class NettyWorkerClientRequestPro
     if (!map.isEmpty()) {
       WritableRequest messagesRequest = new
           SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-      doRequest(workerInfo, remoteServerAddress, messagesRequest);
+      doRequest(workerInfo, messagesRequest);
     }
   }
 
@@ -236,16 +229,12 @@ public class NettyWorkerClientRequestPro
       int partitionMutationCount) {
     // Send a request if enough mutations are there for a partition
     if (partitionMutationCount >= maxMutationsPerPartition) {
-      InetSocketAddress remoteServerAddress =
-          workerClient.getInetSocketAddress(
-              partitionOwner.getWorkerInfo(), partitionId);
       Map<I, VertexMutations<I, V, E, M>> partitionMutations =
           sendMutationsCache.removePartitionMutations(partitionId);
       WritableRequest writableRequest =
           new SendPartitionMutationsRequest<I, V, E, M>(
               partitionId, partitionMutations);
-      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
-          writableRequest);
+      doRequest(partitionOwner.getWorkerInfo(), writableRequest);
     }
   }
 
@@ -320,15 +309,9 @@ public class NettyWorkerClientRequestPro
         remainingMessageCache = sendMessageCache.removeAllMessages();
     for (Map.Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
         remainingMessageCache.entrySet()) {
-      Iterator<Integer> cachedPartitionId =
-          entry.getValue().keySet().iterator();
-      final int partitionId = cachedPartitionId.hasNext() ?
-          cachedPartitionId.next() : NettyWorkerClient.NO_PARTITION_ID;
-      InetSocketAddress remoteWorkerAddress =
-          workerClient.getInetSocketAddress(entry.getKey(), partitionId);
       WritableRequest writableRequest =
           new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
-      doRequest(entry.getKey(), remoteWorkerAddress, writableRequest);
+      doRequest(entry.getKey(), writableRequest);
     }
 
     // Execute the remaining sends mutations (if any)
@@ -342,11 +325,7 @@ public class NettyWorkerClientRequestPro
       PartitionOwner partitionOwner =
           serviceWorker.getVertexPartitionOwner(
               entry.getValue().keySet().iterator().next());
-      InetSocketAddress remoteServerAddress =
-          workerClient.getInetSocketAddress(partitionOwner.getWorkerInfo(),
-              partitionOwner.getPartitionId());
-      doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
-          writableRequest);
+      doRequest(partitionOwner.getWorkerInfo(), writableRequest);
     }
   }
 
@@ -366,11 +345,9 @@ public class NettyWorkerClientRequestPro
    * When doing the request, short circuit if it is local
    *
    * @param workerInfo Worker info
-   * @param remoteServerAddress Remote server address
    * @param writableRequest Request to either submit or run locally
    */
   private void doRequest(WorkerInfo workerInfo,
-                         InetSocketAddress remoteServerAddress,
                          WritableRequest writableRequest) {
     // If this is local, execute locally
     if (serviceWorker.getWorkerInfo().getTaskId() ==
@@ -378,7 +355,7 @@ public class NettyWorkerClientRequestPro
       ((WorkerRequest) writableRequest).doRequest(serverData);
     } else {
       workerClient.sendWritableRequest(
-          workerInfo.getTaskId(), remoteServerAddress, writableRequest);
+          workerInfo.getTaskId(), writableRequest);
     }
   }
 }

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java?rev=1399453&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java Wed Oct 17 22:05:47 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Helper class to write descriptions of master, workers and partition owners
+ */
+public class AddressesAndPartitionsWritable implements Writable {
+  /** Master information */
+  private WorkerInfo masterInfo;
+  /** List of all workers */
+  private List<WorkerInfo> workerInfos;
+  /** Collection of partitions */
+  private Collection<PartitionOwner> partitionOwners;
+  /** Partition owner class, used to deserialize object */
+  private Class<? extends PartitionOwner> partitionOwnerClass;
+
+  /**
+   * Constructor when we want to serialize object
+   *
+   * @param masterInfo Master information
+   * @param workerInfos List of all workers
+   * @param partitionOwners Collection of partitions
+   */
+  public AddressesAndPartitionsWritable(WorkerInfo masterInfo,
+      List<WorkerInfo> workerInfos,
+      Collection<PartitionOwner> partitionOwners) {
+    this.masterInfo = masterInfo;
+    this.workerInfos = workerInfos;
+    this.partitionOwners = partitionOwners;
+  }
+
+  /**
+   * Constructor when we want to deserialize object
+   *
+   * @param partitionOwnerClass Partition owner class
+   */
+  public AddressesAndPartitionsWritable(
+      Class<? extends PartitionOwner> partitionOwnerClass) {
+    this.partitionOwnerClass = partitionOwnerClass;
+  }
+
+  /**
+   * Get master information
+   *
+   * @return Master information
+   */
+  public WorkerInfo getMasterInfo() {
+    return masterInfo;
+  }
+
+  /**
+   * Get all workers
+   *
+   * @return List of all workers
+   */
+  public List<WorkerInfo> getWorkerInfos() {
+    return workerInfos;
+  }
+
+  /**
+   * Get partition owners
+   *
+   * @return Collection of partition owners
+   */
+  public Collection<PartitionOwner> getPartitionOwners() {
+    return partitionOwners;
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    masterInfo.write(output);
+
+    output.writeInt(workerInfos.size());
+    for (WorkerInfo workerInfo : workerInfos) {
+      workerInfo.write(output);
+    }
+
+    output.writeInt(partitionOwners.size());
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      partitionOwner.write(output);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    masterInfo = new WorkerInfo();
+    masterInfo.readFields(input);
+
+    int workerInfosSize = input.readInt();
+    workerInfos = Lists.newArrayListWithCapacity(workerInfosSize);
+    for (int i = 0; i < workerInfosSize; i++) {
+      WorkerInfo workerInfo = new WorkerInfo();
+      workerInfo.readFields(input);
+      workerInfos.add(workerInfo);
+    }
+
+    int partitionOwnersSize = input.readInt();
+    partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
+    for (int i = 0; i < partitionOwnersSize; i++) {
+      try {
+        PartitionOwner partitionOwner = partitionOwnerClass.newInstance();
+        partitionOwner.readFields(input);
+        partitionOwners.add(partitionOwner);
+      } catch (InstantiationException e) {
+        throw new IllegalStateException("readFields: " +
+            "InstantiationException on partition owner class " +
+            partitionOwnerClass, e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException("readFields: " +
+            "IllegalAccessException on partition owner class " +
+            partitionOwnerClass, e);
+      }
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Wed Oct 17 22:05:47 2012
@@ -93,8 +93,6 @@ public abstract class BspService<I exten
       "/_applicationAttemptsDir";
   /** Where the master election happens */
   public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
-  /** Denotes current master's info */
-  public static final String CURRENT_MASTER_INFO_NODE = "/_currentMaster";
   /** Superstep scope */
   public static final String SUPERSTEP_DIR = "/_superstepDir";
   /** Where the merged aggregators are located */
@@ -109,9 +107,9 @@ public abstract class BspService<I exten
       "/_workerWroteCheckpointDir";
   /** Finished workers notify here */
   public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
-  /** Where the partition assignments are set */
-  public static final String PARTITION_ASSIGNMENTS_DIR =
-      "/_partitionAssignments";
+  /** Where the master and worker addresses and partition assignments are set */
+  public static final String ADDRESSES_AND_PARTITIONS_DIR =
+      "/_addressesAndPartitions";
   /** Helps coordinate the partition exchnages */
   public static final String PARTITION_EXCHANGE_DIR =
       "/_partitionExchangeDir";
@@ -202,8 +200,6 @@ public abstract class BspService<I exten
   protected final String checkpointBasePath;
   /** Path to the master election path */
   protected final String masterElectionPath;
-  /** Path to current master info */
-  protected final String currentMasterPath;
   /** Private ZooKeeper instance that implements the service */
   private final ZooKeeperExt zk;
   /** Has the Connection occurred? */
@@ -218,8 +214,8 @@ public abstract class BspService<I exten
   private final BspEvent inputSplitsAllDoneChanged;
   /** InputSplit done by a worker finished notification and synchronization */
   private final BspEvent inputSplitsDoneStateChanged;
-  /** Are the partition assignments to workers ready? */
-  private final BspEvent partitionAssignmentsReadyChanged;
+  /** Are the addresses and partition assignments to workers ready? */
+  private final BspEvent addressesAndPartitionsReadyChanged;
   /** Application attempt changed */
   private final BspEvent applicationAttemptChanged;
   /** Superstep finished synchronization */
@@ -276,7 +272,7 @@ public abstract class BspService<I exten
     this.inputSplitsStateChanged = new PredicateLock(context);
     this.inputSplitsAllDoneChanged = new PredicateLock(context);
     this.inputSplitsDoneStateChanged = new PredicateLock(context);
-    this.partitionAssignmentsReadyChanged = new PredicateLock(context);
+    this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
     this.applicationAttemptChanged = new PredicateLock(context);
     this.superstepFinished = new PredicateLock(context);
     this.masterElectionChildrenChanged = new PredicateLock(context);
@@ -286,7 +282,7 @@ public abstract class BspService<I exten
     registerBspEvent(workerHealthRegistrationChanged);
     registerBspEvent(inputSplitsAllReadyChanged);
     registerBspEvent(inputSplitsStateChanged);
-    registerBspEvent(partitionAssignmentsReadyChanged);
+    registerBspEvent(addressesAndPartitionsReadyChanged);
     registerBspEvent(applicationAttemptChanged);
     registerBspEvent(superstepFinished);
     registerBspEvent(masterElectionChildrenChanged);
@@ -332,7 +328,6 @@ public abstract class BspService<I exten
         GiraphConfiguration.CHECKPOINT_DIRECTORY,
         GiraphConfiguration.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
     masterElectionPath = basePath + MASTER_ELECTION_DIR;
-    currentMasterPath = basePath + CURRENT_MASTER_INFO_NODE;
     if (LOG.isInfoEnabled()) {
       LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
           ", " + getTaskPartition() + " on " + serverPortList);
@@ -460,16 +455,16 @@ public abstract class BspService<I exten
   }
 
   /**
-   * Generate the "partiton assignments" directory path for a superstep
+   * Generate the "addresses and partitions" directory path for a superstep
    *
    * @param attempt application attempt number
    * @param superstep superstep to use
    * @return directory path based on the a superstep
    */
-  public final String getPartitionAssignmentsPath(long attempt,
+  public final String getAddressesAndPartitionsPath(long attempt,
       long superstep) {
     return applicationAttemptsPath + "/" + attempt +
-        SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR;
+        SUPERSTEP_DIR + "/" + superstep + ADDRESSES_AND_PARTITIONS_DIR;
   }
 
   /**
@@ -656,8 +651,8 @@ public abstract class BspService<I exten
     return inputSplitsDoneStateChanged;
   }
 
-  public final BspEvent getPartitionAssignmentsReadyChangedEvent() {
-    return partitionAssignmentsReadyChanged;
+  public final BspEvent getAddressesAndPartitionsReadyChangedEvent() {
+    return addressesAndPartitionsReadyChanged;
   }
 
 
@@ -1014,13 +1009,13 @@ public abstract class BspService<I exten
       }
       inputSplitsAllDoneChanged.signal();
       eventProcessed = true;
-    } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) &&
+    } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
         event.getType() == EventType.NodeCreated) {
       if (LOG.isInfoEnabled()) {
         LOG.info("process: partitionAssignmentsReadyChanged " +
             "(partitions are assigned)");
       }
-      partitionAssignmentsReadyChanged.signal();
+      addressesAndPartitionsReadyChanged.signal();
       eventProcessed = true;
     } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
         event.getType() == EventType.NodeCreated) {

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Oct 17 22:05:47 2012
@@ -55,6 +55,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import net.iharder.Base64;
 
@@ -149,6 +150,8 @@ public class BspServiceMaster<I extends 
   private MasterClientServer commService;
   /** Master info */
   private WorkerInfo masterInfo;
+  /** List of workers in current superstep */
+  private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
   /** Limit locality information added to each InputSplit znode */
   private final int localityLimit = 5;
 
@@ -602,6 +605,11 @@ public class BspServiceMaster<I extends 
   }
 
   @Override
+  public List<WorkerInfo> getWorkerInfoList() {
+    return chosenWorkerInfoList;
+  }
+
+  @Override
   public MasterAggregatorUsage getAggregatorUsage() {
     return aggregatorHandler;
   }
@@ -763,12 +771,9 @@ public class BspServiceMaster<I extends 
           aggregatorHandler.initialize(this);
 
           commService = new NettyMasterClientServer(
-              getContext(), getConfiguration());
+              getContext(), getConfiguration(), this);
           masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
               commService.getMyAddress().getPort());
-          // write my address to znode so workers could read it
-          WritableUtils.writeToZnode(getZkExt(), currentMasterPath, -1,
-              masterInfo);
 
           if (LOG.isInfoEnabled()) {
             LOG.info("becomeMaster: I am now the master!");
@@ -796,7 +801,7 @@ public class BspServiceMaster<I extends 
    * @return Global statistics aggregated on all worker statistics
    */
   private GlobalStats aggregateWorkerStats(long superstep) {
-    Class<? extends Writable> partitionStatsClass =
+    Class<? extends PartitionStats> partitionStatsClass =
         masterGraphPartitioner.createPartitionStats().getClass();
     GlobalStats globalStats = new GlobalStats();
     // Get the stats from the all the worker selected nodes
@@ -822,15 +827,15 @@ public class BspServiceMaster<I extends 
         byte [] zkData =
             getZkExt().getData(finishedPath, false, null);
         workerFinishedInfoObj = new JSONObject(new String(zkData));
-        List<? extends Writable> writableList =
+        List<PartitionStats> statsList =
             WritableUtils.readListFieldsFromByteArray(
                 Base64.decode(workerFinishedInfoObj.getString(
                     JSONOBJ_PARTITION_STATS_KEY)),
                     partitionStatsClass,
                     getConfiguration());
-        for (Writable writable : writableList) {
-          globalStats.addPartitionStats((PartitionStats) writable);
-          allPartitionStatsList.add((PartitionStats) writable);
+        for (PartitionStats partitionStats : statsList) {
+          globalStats.addPartitionStats(partitionStats);
+          allPartitionStatsList.add(partitionStats);
         }
         globalStats.addMessageCount(
             workerFinishedInfoObj.getLong(
@@ -986,14 +991,17 @@ public class BspServiceMaster<I extends 
     }
 
     // Workers are waiting for these assignments
-    String partitionAssignmentsPath =
-        getPartitionAssignmentsPath(getApplicationAttempt(),
+    AddressesAndPartitionsWritable addressesAndPartitions =
+        new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
+            partitionOwners);
+    String addressesAndPartitionsPath =
+        getAddressesAndPartitionsPath(getApplicationAttempt(),
             getSuperstep());
-    WritableUtils.writeListToZnode(
+    WritableUtils.writeToZnode(
         getZkExt(),
-        partitionAssignmentsPath,
+        addressesAndPartitionsPath,
         -1,
-        new ArrayList<Writable>(partitionOwners));
+        addressesAndPartitions);
   }
 
   /**
@@ -1252,7 +1260,7 @@ public class BspServiceMaster<I extends 
     // 5. Create superstep finished node
     // 6. If the checkpoint frequency is met, finalize the checkpoint
 
-    List<WorkerInfo> chosenWorkerInfoList = checkWorkers();
+    chosenWorkerInfoList = checkWorkers();
     if (chosenWorkerInfoList == null) {
       LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
           "superstep " + getSuperstep());
@@ -1271,7 +1279,7 @@ public class BspServiceMaster<I extends 
       }
     }
 
-    commService.fixWorkerAddresses(chosenWorkerInfoList);
+    commService.openConnections();
 
     currentWorkersCounter.increment(chosenWorkerInfoList.size() -
         currentWorkersCounter.getValue());

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 17 22:05:47 2012
@@ -107,17 +107,13 @@ public class BspServiceWorker<I extends 
   private final WorkerServer<I, V, E, M> workerServer;
   /** Master info */
   private WorkerInfo masterInfo = new WorkerInfo();
+  /** List of workers */
+  private List<WorkerInfo> workerInfoList = Lists.newArrayList();
   /** Have the partition exchange children (workers) changed? */
   private final BspEvent partitionExchangeChildrenChanged;
 
   /** Worker Context */
   private final WorkerContext workerContext;
-  /** Total vertices loaded */
-  private long totalVerticesLoaded = 0;
-  /** Total edges loaded */
-  private long totalEdgesLoaded = 0;
-  /** Input split max vertices (-1 denotes all) */
-  private final long inputSplitMaxVertices;
   /**
    * Stores and processes the list of InputSplits advertised
    * in a tree of child znodes by the master.
@@ -146,7 +142,6 @@ public class BspServiceWorker<I extends 
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
-    inputSplitMaxVertices = getConfiguration().getInputSplitMaxVertices();
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
     workerServer = new NettyWorkerServer<I, V, E, M>(getConfiguration(),
@@ -342,6 +337,11 @@ public class BspServiceWorker<I extends 
   }
 
   @Override
+  public List<WorkerInfo> getWorkerInfoList() {
+    return workerInfoList;
+  }
+
+  @Override
   public FinishedSuperstepStats setup() {
     // Unless doing a restart, prepare for computation:
     // 1. Start superstep INPUT_SUPERSTEP (no computation)
@@ -603,29 +603,24 @@ else[HADOOP_NON_SECURE]*/
 
     registerHealth(getSuperstep());
 
-    String partitionAssignmentsNode =
-        getPartitionAssignmentsPath(getApplicationAttempt(),
+    String addressesAndPartitionsPath =
+        getAddressesAndPartitionsPath(getApplicationAttempt(),
             getSuperstep());
-    Collection<? extends PartitionOwner> masterSetPartitionOwners;
+    AddressesAndPartitionsWritable addressesAndPartitions =
+        new AddressesAndPartitionsWritable(
+            workerGraphPartitioner.createPartitionOwner().getClass());
     try {
-      while (getZkExt().exists(partitionAssignmentsNode, true) ==
+      while (getZkExt().exists(addressesAndPartitionsPath, true) ==
           null) {
-        getPartitionAssignmentsReadyChangedEvent().waitForever();
-        getPartitionAssignmentsReadyChangedEvent().reset();
+        getAddressesAndPartitionsReadyChangedEvent().waitForever();
+        getAddressesAndPartitionsReadyChangedEvent().reset();
       }
-      List<? extends Writable> writableList =
-          WritableUtils.readListFieldsFromZnode(
-              getZkExt(),
-              partitionAssignmentsNode,
-              false,
-              null,
-              workerGraphPartitioner.createPartitionOwner().getClass(),
-              getConfiguration());
-
-      @SuppressWarnings("unchecked")
-      Collection<? extends PartitionOwner> castedWritableList =
-        (Collection<? extends PartitionOwner>) writableList;
-      masterSetPartitionOwners = castedWritableList;
+      WritableUtils.readFieldsFromZnode(
+          getZkExt(),
+          addressesAndPartitionsPath,
+          false,
+          null,
+          addressesAndPartitions);
     } catch (KeeperException e) {
       throw new IllegalStateException(
           "startSuperstep: KeeperException getting assignments", e);
@@ -634,15 +629,15 @@ else[HADOOP_NON_SECURE]*/
           "startSuperstep: InterruptedException getting assignments", e);
     }
 
-    // get address of master
-    WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
-        null, masterInfo);
+    workerInfoList.clear();
+    workerInfoList = addressesAndPartitions.getWorkerInfos();
+    masterInfo = addressesAndPartitions.getMasterInfo();
 
     if (LOG.isInfoEnabled()) {
       LOG.info("startSuperstep: Ready for computation on superstep " +
           getSuperstep() + " since worker " +
           "selection and vertex range assignments are done in " +
-          partitionAssignmentsNode);
+          addressesAndPartitionsPath);
     }
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
@@ -652,7 +647,7 @@ else[HADOOP_NON_SECURE]*/
         getGraphMapper().getMapFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
-    return masterSetPartitionOwners;
+    return addressesAndPartitions.getPartitionOwners();
   }
 
   @Override
@@ -1151,7 +1146,7 @@ else[HADOOP_NON_SECURE]*/
     PartitionExchange partitionExchange =
         workerGraphPartitioner.updatePartitionOwners(
             getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
-    workerClient.openConnections(getPartitionOwners());
+    workerClient.openConnections();
 
     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
         partitionExchange.getSendWorkerPartitionMap();
@@ -1263,11 +1258,6 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public Collection<? extends PartitionOwner> getPartitionOwners() {
-    return workerGraphPartitioner.getPartitionOwners();
-  }
-
-  @Override
   public PartitionOwner getVertexPartitionOwner(I vertexId) {
     return workerGraphPartitioner.getPartitionOwner(vertexId);
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Wed Oct 17 22:05:47 2012
@@ -55,7 +55,7 @@ import org.apache.zookeeper.data.Stat;
  */
 public class InputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-    implements Callable {
+    implements Callable<VertexEdgeCount> {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(InputSplitsCallable.class);

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java Wed Oct 17 22:05:47 2012
@@ -58,6 +58,16 @@ public class WorkerInfo implements Writa
     this.hostnameId = hostname + "_" + taskId;
   }
 
+  /**
+   * Constructor with InetSocketAddress
+   *
+   * @param address Address of this worker
+   * @param taskId The task partition for this worker
+   */
+  public WorkerInfo(InetSocketAddress address, int taskId) {
+    this(address.getHostName(), taskId, address.getPort());
+  }
+
   public String getHostname() {
     return hostname;
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java Wed Oct 17 22:05:47 2012
@@ -193,24 +193,25 @@ public class WritableUtils {
   }
 
   /**
-   * Read fields from byteArray to a list of Writable objects.
+   * Read fields from byteArray to a list of objects.
    *
    * @param byteArray Byte array to find the fields in.
    * @param writableClass Class of the objects to instantiate.
    * @param conf Configuration used for instantiation (i.e Configurable)
-   * @return List of writable objects.
+   * @param <T> Object type
+   * @return List of objects.
    */
-  public static List<? extends Writable> readListFieldsFromByteArray(
+  public static <T extends Writable> List<T> readListFieldsFromByteArray(
       byte[] byteArray,
-      Class<? extends Writable> writableClass,
+      Class<? extends T> writableClass,
       Configuration conf) {
     try {
       DataInputStream inputStream =
           new DataInputStream(new ByteArrayInputStream(byteArray));
       int size = inputStream.readInt();
-      List<Writable> writableList = new ArrayList<Writable>(size);
+      List<T> writableList = new ArrayList<T>(size);
       for (int i = 0; i < size; ++i) {
-        Writable writable =
+        T writable =
             ReflectionUtils.newInstance(writableClass, conf);
         writable.readFields(inputStream);
         writableList.add(writable);
@@ -231,18 +232,20 @@ public class WritableUtils {
    * @param stat Stat of znode if desired.
    * @param writableClass Class of the objects to instantiate.
    * @param conf Configuration used for instantiation (i.e Configurable)
-   * @return List of writable objects.
+   * @param <T> Object type
+   * @return List of objects.
    */
-  public static List<? extends Writable> readListFieldsFromZnode(
+  public static <T extends Writable> List<T> readListFieldsFromZnode(
       ZooKeeperExt zkExt,
       String zkPath,
       boolean watch,
       Stat stat,
-      Class<? extends Writable> writableClass,
+      Class<? extends T> writableClass,
       Configuration conf) {
     try {
       byte[] zkData = zkExt.getData(zkPath, false, stat);
-      return readListFieldsFromByteArray(zkData, writableClass, conf);
+      return WritableUtils.<T>readListFieldsFromByteArray(zkData,
+          writableClass, conf);
     } catch (KeeperException e) {
       throw new IllegalStateException(
           "readListFieldsFromZnode: KeeperException on " + zkPath, e);

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java Wed Oct 17 22:05:47 2012
@@ -18,8 +18,9 @@
 
 package org.apache.giraph.comm;
 
-import com.google.common.collect.Maps;
-import java.util.Map;
+import com.google.common.collect.Lists;
+
+import java.util.List;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -28,6 +29,7 @@ import org.apache.giraph.comm.netty.Nett
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -38,8 +40,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
 
 /**
  * Test the netty connections
@@ -85,9 +85,9 @@ public class ConnectionTest {
     server.start();
 
     NettyClient client = new NettyClient(context, conf);
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server.getMyAddress(), -1);
-    client.connectAllAddresses(addressIdMap);
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(
+            new WorkerInfo(server.getMyAddress(), -1)));
 
     client.stop();
     server.stop();
@@ -121,11 +121,12 @@ public class ConnectionTest {
     server3.start();
 
     NettyClient client = new NettyClient(context, conf);
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server1.getMyAddress(), -1);
-    addressIdMap.put(server2.getMyAddress(), -1);
-    addressIdMap.put(server3.getMyAddress(), -1);
-    client.connectAllAddresses(addressIdMap);
+    WorkerInfo workerInfo1 = new WorkerInfo(server1.getMyAddress(), 1);
+    WorkerInfo workerInfo2 = new WorkerInfo(server2.getMyAddress(), 2);
+    WorkerInfo workerInfo3 = new WorkerInfo(server3.getMyAddress(), 3);
+    List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
+        workerInfo2, workerInfo3);
+    client.connectAllAddresses(addresses);
 
     client.stop();
     server1.stop();
@@ -154,14 +155,15 @@ public class ConnectionTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
 
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server.getMyAddress(), -1);
+    List<WorkerInfo> addresses =
+        Lists.<WorkerInfo>newArrayList(
+            new WorkerInfo(server.getMyAddress(), -1));
     NettyClient client1 = new NettyClient(context, conf);
-    client1.connectAllAddresses(addressIdMap);
+    client1.connectAllAddresses(addresses);
     NettyClient client2 = new NettyClient(context, conf);
-    client2.connectAllAddresses(addressIdMap);
+    client2.connectAllAddresses(addresses);
     NettyClient client3 = new NettyClient(context, conf);
-    client3.connectAllAddresses(addressIdMap);
+    client3.connectAllAddresses(addresses);
 
     client1.stop();
     client2.stop();

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Wed Oct 17 22:05:47 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import java.net.InetSocketAddress;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -28,6 +27,7 @@ import org.apache.giraph.comm.netty.hand
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -43,7 +43,6 @@ import com.google.common.collect.Maps;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -141,15 +140,15 @@ public class RequestFailureTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server.getMyAddress(), -1);
-    client.connectAllAddresses(addressIdMap);
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(
+            new WorkerInfo(server.getMyAddress(), -1)));
 
     // Send the request 2x
     WritableRequest request1 = getRequest();
     WritableRequest request2 = getRequest();
-    client.sendWritableRequest(-1, server.getMyAddress(), request1);
-    client.sendWritableRequest(-1, server.getMyAddress(), request2);
+    client.sendWritableRequest(-1, request1);
+    client.sendWritableRequest(-1, request2);
     client.waitAllRequests();
 
     // Stop the service
@@ -179,15 +178,15 @@ public class RequestFailureTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server.getMyAddress(), -1);
-    client.connectAllAddresses(addressIdMap);
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(
+            new WorkerInfo(server.getMyAddress(), -1)));
 
     // Send the request 2x, but should only be processed once
     WritableRequest request1 = getRequest();
     WritableRequest request2 = getRequest();
-    client.sendWritableRequest(-1, server.getMyAddress(), request1);
-    client.sendWritableRequest(-1, server.getMyAddress(), request2);
+    client.sendWritableRequest(-1, request1);
+    client.sendWritableRequest(-1, request2);
     client.waitAllRequests();
 
     // Stop the service
@@ -216,15 +215,15 @@ public class RequestFailureTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server.getMyAddress(), -1);
-    client.connectAllAddresses(addressIdMap);
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(
+            new WorkerInfo(server.getMyAddress(), -1)));
 
     // Send the request 2x, but should only be processed once
     WritableRequest request1 = getRequest();
     WritableRequest request2 = getRequest();
-    client.sendWritableRequest(-1, server.getMyAddress(), request1);
-    client.sendWritableRequest(-1, server.getMyAddress(), request2);
+    client.sendWritableRequest(-1, request1);
+    client.sendWritableRequest(-1, request2);
     client.waitAllRequests();
 
     // Stop the service

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Wed Oct 17 22:05:47 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.comm;
 
-import java.net.InetSocketAddress;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -32,6 +31,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.PartitionStore;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.io.IntWritable;
@@ -50,7 +50,6 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -102,9 +101,9 @@ public class RequestTest {
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
     client = new NettyClient(context, conf);
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server.getMyAddress(), -1);
-    client.connectAllAddresses(addressIdMap);
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(
+            new WorkerInfo(server.getMyAddress(), -1)));
   }
 
   @Test
@@ -126,7 +125,7 @@ public class RequestTest {
     IntWritable> request =
       new SendVertexRequest<IntWritable, IntWritable,
       IntWritable, IntWritable>(partitionId, vertices);
-    client.sendWritableRequest(-1, server.getMyAddress(), request);
+    client.sendWritableRequest(-1, request);
     client.waitAllRequests();
 
     // Stop the service
@@ -170,7 +169,7 @@ public class RequestTest {
         IntWritable> request =
       new SendWorkerMessagesRequest<IntWritable, IntWritable,
             IntWritable, IntWritable>(sendMap);
-    client.sendWritableRequest(-1, server.getMyAddress(), request);
+    client.sendWritableRequest(-1, request);
     client.waitAllRequests();
 
     // Stop the service
@@ -233,7 +232,7 @@ public class RequestTest {
     IntWritable> request =
       new SendPartitionMutationsRequest<IntWritable, IntWritable,
       IntWritable, IntWritable>(partitionId, vertexIdMutations);
-    client.sendWritableRequest(-1, server.getMyAddress(), request);
+    client.sendWritableRequest(-1, request);
     client.waitAllRequests();
 
     // Stop the service

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Wed Oct 17 22:05:47 2012
@@ -18,9 +18,7 @@
 
 package org.apache.giraph.comm;
 
-import com.google.common.collect.Maps;
-import java.net.InetSocketAddress;
-import java.util.Map;
+import com.google.common.collect.Lists;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -29,6 +27,7 @@ import org.apache.giraph.comm.netty.Nett
 import org.apache.giraph.comm.netty.handler.SaslServerHandler;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -96,9 +95,9 @@ public class SaslConnectionTest {
     server.start();
 
     NettyClient client = new NettyClient(context, conf);
-    Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
-    addressIdMap.put(server.getMyAddress(), -1);
-    client.connectAllAddresses(addressIdMap);
+    client.connectAllAddresses(
+        Lists.<WorkerInfo>newArrayList(
+            new WorkerInfo(server.getMyAddress(), -1)));
 
     client.stop();
     server.stop();