You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/10/18 00:05:48 UTC
svn commit: r1399453 - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/bsp/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/netty/
giraph/src/main/java/org/apache/giraph/graph/ giraph/src/main/java/o...
Author: maja
Date: Wed Oct 17 22:05:47 2012
New Revision: 1399453
URL: http://svn.apache.org/viewvc?rev=1399453&view=rev
Log:
GIRAPH-372: Write worker addresses to Zookeeper; move addresses and resolution to NettyClient
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct 17 22:05:47 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-372: Write worker addresses to Zookeeper;
+ move addresses and resolution to NettyClient (majakabiljo)
+
GIRAPH-373: RandomMessageBenchmark is broken (majakabiljo).
GIRAPH-374: Multithreading in input split loading and compute (aching).
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedService.java Wed Oct 17 22:05:47 2012
@@ -18,10 +18,12 @@
package org.apache.giraph.bsp;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.IOException;
+import java.util.List;
/**
* Basic service interface shared by both {@link CentralizedServiceMaster} and
@@ -61,6 +63,13 @@ public interface CentralizedService<I ex
boolean checkpointFrequencyMet(long superstep);
/**
+ * Get list of workers
+ *
+ * @return List of workers
+ */
+ List<WorkerInfo> getWorkerInfoList();
+
+ /**
* Clean up the service (no calls may be issued after this)
*
* @throws IOException
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Wed Oct 17 22:05:47 2012
@@ -93,13 +93,6 @@ public interface CentralizedServiceWorke
PartitionStore<I, V, E, M> getPartitionStore();
/**
- * Get a collection of all the partition owners.
- *
- * @return Collection of all the partition owners.
- */
- Collection<? extends PartitionOwner> getPartitionOwners();
-
- /**
* Both the vertices and the messages need to be checkpointed in order
* for them to be used. This is done after all messages have been
* delivered, but prior to a superstep starting.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java Wed Oct 17 22:05:47 2012
@@ -18,18 +18,14 @@
package org.apache.giraph.comm;
-import org.apache.giraph.graph.WorkerInfo;
-
/**
* Interface for master to send messages to workers
*/
public interface MasterClient {
/**
- * Fix workers for current superstep
- *
- * @param workers Information about workers
+ * Make sure that all the connections to workers have been established.
*/
- void fixWorkerAddresses(Iterable<WorkerInfo> workers);
+ void openConnections();
/**
* Flush all outgoing messages. This will synchronously ensure that all
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Wed Oct 17 22:05:47 2012
@@ -18,11 +18,8 @@
package org.apache.giraph.comm;
-import java.net.InetSocketAddress;
-import java.util.Collection;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -66,34 +63,18 @@ else[HADOOP_NON_SECURE]*/
PartitionOwner getVertexPartitionOwner(I vertexId);
/**
- * Make sure that all the connections to the partitions owners have been
+ * Make sure that all the connections to workers and master have been
* established.
- *
- * @param partitionOwners Partition owners to establish/check connections
- */
- void openConnections(
- Collection<? extends PartitionOwner> partitionOwners);
-
- /**
- * Fill the socket address cache for the worker info and its partition.
- *
- * @param workerInfo Worker information to get the socket address
- * @param partitionId Partition id to look up.
- * @return address of the vertex range server containing this vertex
*/
- InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
- int partitionId);
+ void openConnections();
/**
* Send a request to a remote server (should be already connected)
*
* @param destWorkerId Destination worker id
- * @param remoteServer Server to send the request to
* @param request Request to send
*/
- void sendWritableRequest(Integer destWorkerId,
- InetSocketAddress remoteServer,
- WritableRequest request);
+ void sendWritableRequest(Integer destWorkerId, WritableRequest request);
/**
* Wait until all the outstanding requests are completed.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Oct 17 22:05:47 2012
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -47,6 +48,7 @@ import org.apache.giraph.comm.requests.R
import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
@@ -102,6 +104,11 @@ else[HADOOP_NON_SECURE]*/
private final ConcurrentMap<InetSocketAddress, ChannelRotater>
addressChannelMap = new MapMaker().makeMap();
/**
+ * Map from task id to address of its server
+ */
+ private final Map<Integer, InetSocketAddress> taskIdAddressMap =
+ new MapMaker().makeMap();
+ /**
* Request map of client request ids to request information.
*/
private final ConcurrentMap<ClientRequestId, RequestInfo>
@@ -122,8 +129,6 @@ else[HADOOP_NON_SECURE]*/
private final int maxConnectionFailures;
/** Maximum number of milliseconds for a request */
private final int maxRequestMilliseconds;
- /** Maximum number of reconnection failures */
- private final int maxReconnectionFailures;
/** Waiting internal for checking outstanding requests msecs */
private final int waitingRequestMsecs;
/** Timed logger for printing request debugging */
@@ -139,6 +144,8 @@ else[HADOOP_NON_SECURE]*/
private final int clientId;
/** Maximum thread pool size */
private final int maxPoolSize;
+ /** Maximum number of attempts to resolve an address*/
+ private final int maxResolveAddressAttempts;
/** Execution handler (if used) */
private final ExecutionHandler executionHandler;
/** Name of the handler before the execution handler (if used) */
@@ -186,10 +193,6 @@ else[HADOOP_NON_SECURE]*/
GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES,
GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
- maxReconnectionFailures = conf.getInt(
- GiraphConfiguration.MAX_RECONNECT_ATTEMPTS,
- GiraphConfiguration.MAX_RECONNECT_ATTEMPTS_DEFAULT);
-
waitingRequestMsecs = conf.getInt(
GiraphConfiguration.WAITING_REQUEST_MSECS,
GiraphConfiguration.WAITING_REQUEST_MSECS_DEFAULT);
@@ -198,6 +201,10 @@ else[HADOOP_NON_SECURE]*/
GiraphConfiguration.NETTY_CLIENT_THREADS,
GiraphConfiguration.NETTY_CLIENT_THREADS_DEFAULT);
+ maxResolveAddressAttempts = conf.getInt(
+ GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
+ GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
+
clientRequestIdRequestInfoMap =
new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
@@ -342,20 +349,27 @@ else[HADOOP_NON_SECURE]*/
}
/**
- * Connect to a collection of addresses
+ * Connect to a collection of tasks servers
*
- * @param addresses Addresses to connect to (if haven't already connected)
+ * @param tasks Tasks to connect to (if haven't already connected)
*/
- public void connectAllAddresses(Map<InetSocketAddress, Integer> addresses) {
+ public void connectAllAddresses(Collection<WorkerInfo> tasks) {
List<ChannelFutureAddress> waitingConnectionList =
- Lists.newArrayListWithCapacity(addresses.size() * channelsPerServer);
- for (Map.Entry<InetSocketAddress, Integer> entry : addresses.entrySet()) {
+ Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
+ for (WorkerInfo taskInfo : tasks) {
context.progress();
- InetSocketAddress address = entry.getKey();
+ InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());
+ if (address == null ||
+ !address.getHostName().equals(taskInfo.getHostname()) ||
+ address.getPort() != taskInfo.getPort()) {
+ address = resolveAddress(maxResolveAddressAttempts,
+ taskInfo.getInetSocketAddress());
+ taskIdAddressMap.put(taskInfo.getTaskId(), address);
+ }
if (address == null || address.getHostName() == null ||
address.getHostName().isEmpty()) {
throw new IllegalStateException("connectAllAddresses: Null address " +
- "in addresses " + addresses);
+ "in addresses " + tasks);
}
if (address.isUnresolved()) {
throw new IllegalStateException("connectAllAddresses: Unresolved " +
@@ -372,7 +386,7 @@ else[HADOOP_NON_SECURE]*/
waitingConnectionList.add(
new ChannelFutureAddress(
- connectionFuture, address, entry.getValue()));
+ connectionFuture, address, taskInfo.getTaskId()));
}
}
@@ -490,9 +504,7 @@ else[HADOOP_NON_SECURE]*/
"to complete..");
}
SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();
- sendWritableRequest(taskId, (InetSocketAddress) channel
- .getRemoteAddress(),
- saslTokenMessage);
+ sendWritableRequest(taskId, saslTokenMessage);
// We now wait for Netty's thread pool to communicate over this
// channel to authenticate with another worker acting as a server.
try {
@@ -611,12 +623,11 @@ else[HADOOP_NON_SECURE]*/
* Send a request to a remote server (should be already connected)
*
* @param destWorkerId Destination worker id
- * @param remoteServer Server to send the request to
* @param request Request to send
*/
public void sendWritableRequest(Integer destWorkerId,
- InetSocketAddress remoteServer,
- WritableRequest request) {
+ WritableRequest request) {
+ InetSocketAddress remoteServer = taskIdAddressMap.get(destWorkerId);
if (clientRequestIdRequestInfoMap.isEmpty()) {
byteCounter.resetAll();
}
@@ -756,4 +767,38 @@ else[HADOOP_NON_SECURE]*/
addedRequestInfos.clear();
}
}
+
+ /**
+ * Utility method for resolving addresses
+ *
+ * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
+ * address
+ * @param address The address we are attempting to resolve
+ * @return The successfully resolved address.
+ * @throws IllegalStateException if the address is not resolved
+ * in <code>maxResolveAddressAttempts</code> tries.
+ */
+ private static InetSocketAddress resolveAddress(
+ int maxResolveAddressAttempts, InetSocketAddress address) {
+ int resolveAttempts = 0;
+ while (address.isUnresolved() &&
+ resolveAttempts < maxResolveAddressAttempts) {
+ ++resolveAttempts;
+ LOG.warn("resolveAddress: Failed to resolve " + address +
+ " on attempt " + resolveAttempts + " of " +
+ maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ LOG.warn("resolveAddress: Interrupted.", e);
+ }
+ address = new InetSocketAddress(address.getHostName(),
+ address.getPort());
+ }
+ if (resolveAttempts >= maxResolveAddressAttempts) {
+ throw new IllegalStateException("resolveAddress: Couldn't " +
+ "resolve " + address + " in " + resolveAttempts + " tries.");
+ }
+ return address;
+ }
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java Wed Oct 17 22:05:47 2012
@@ -18,19 +18,11 @@
package org.apache.giraph.comm.netty;
-import com.google.common.collect.Maps;
-import java.util.Map;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.graph.WorkerInfo;
import org.apache.hadoop.mapreduce.Mapper;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-
/**
* Netty implementation of {@link MasterClient}
*/
@@ -38,30 +30,25 @@ public class NettyMasterClient implement
/** Netty client that does the actual I/O */
private final NettyClient nettyClient;
/** Worker information for current superstep */
- private Collection<WorkerInfo> workers;
+ private CentralizedServiceMaster<?, ?, ?, ?> service;
/**
* Constructor
*
* @param context Context from mapper
* @param configuration Configuration
+ * @param service Centralized service
*/
public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration configuration) {
+ ImmutableClassesGiraphConfiguration configuration,
+ CentralizedServiceMaster<?, ?, ?, ?> service) {
this.nettyClient = new NettyClient(context, configuration);
- workers = Lists.newArrayList();
+ this.service = service;
}
@Override
- public void fixWorkerAddresses(Iterable<WorkerInfo> workers) {
- this.workers.clear();
- Iterables.addAll(this.workers, workers);
- Map<InetSocketAddress, Integer> addresses =
- Maps.newHashMapWithExpectedSize(this.workers.size());
- for (WorkerInfo worker : workers) {
- addresses.put(worker.getInetSocketAddress(), worker.getTaskId());
- }
- nettyClient.connectAllAddresses(addresses);
+ public void openConnections() {
+ nettyClient.connectAllAddresses(service.getWorkerInfoList());
}
@Override
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java Wed Oct 17 22:05:47 2012
@@ -19,10 +19,10 @@
package org.apache.giraph.comm.netty;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.MasterClientServer;
import org.apache.giraph.comm.MasterServer;
-import org.apache.giraph.graph.WorkerInfo;
import org.apache.hadoop.mapreduce.Mapper;
import java.net.InetSocketAddress;
@@ -41,17 +41,19 @@ public class NettyMasterClientServer imp
*
* @param context Mapper context
* @param configuration Configuration
+ * @param service Centralized service
*/
public NettyMasterClientServer(
Mapper<?, ?, ?, ?>.Context context,
- ImmutableClassesGiraphConfiguration configuration) {
- client = new NettyMasterClient(context, configuration);
+ ImmutableClassesGiraphConfiguration configuration,
+ CentralizedServiceMaster<?, ?, ?, ?> service) {
+ client = new NettyMasterClient(context, configuration, service);
server = new NettyMasterServer(configuration);
}
@Override
- public void fixWorkerAddresses(Iterable<WorkerInfo> workers) {
- client.fixWorkerAddresses(workers);
+ public void openConnections() {
+ client.openConnections();
}
@Override
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Wed Oct 17 22:05:47 2012
@@ -18,8 +18,6 @@
package org.apache.giraph.comm.netty;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClient;
@@ -31,13 +29,10 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
/**
* Takes users facing APIs in {@link WorkerClient} and implements them
@@ -52,8 +47,6 @@ import java.util.concurrent.ConcurrentHa
public class NettyWorkerClient<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> implements
WorkerClient<I, V, E, M> {
- /** Signal for getInetSocketAddress() to use WorkerInfo's address */
- public static final int NO_PARTITION_ID = Integer.MIN_VALUE;
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
/** Hadoop configuration */
@@ -62,14 +55,6 @@ public class NettyWorkerClient<I extends
private final NettyClient nettyClient;
/** Centralized service, needed to get vertex ranges */
private final CentralizedServiceWorker<I, V, E, M> service;
- /**
- * Cached map of partition ids to remote socket address.
- */
- private final ConcurrentMap<Integer, InetSocketAddress>
- partitionIndexAddressMap =
- new ConcurrentHashMap<Integer, InetSocketAddress>();
- /** Maximum number of attempts to resolve an address*/
- private final int maxResolveAddressAttempts;
/**
* Only constructor.
@@ -85,10 +70,6 @@ public class NettyWorkerClient<I extends
this.nettyClient = new NettyClient(context, configuration);
this.conf = configuration;
this.service = service;
-
- maxResolveAddressAttempts = conf.getInt(
- GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
- GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
}
public CentralizedServiceWorker<I, V, E, M> getService() {
@@ -96,94 +77,17 @@ public class NettyWorkerClient<I extends
}
@Override
- public void openConnections(
- Collection<? extends PartitionOwner> partitionOwners) {
- // 1. Fix all the cached inet addresses (remove all changed entries)
- // 2. Connect to any new IPC servers
- Map<InetSocketAddress, Integer> addressTaskIdMap =
- Maps.newHashMapWithExpectedSize(partitionOwners.size());
- for (PartitionOwner partitionOwner : partitionOwners) {
- InetSocketAddress address =
- partitionIndexAddressMap.get(
- partitionOwner.getPartitionId());
- if (address != null &&
- (!address.getHostName().equals(
- partitionOwner.getWorkerInfo().getHostname()) ||
- address.getPort() !=
- partitionOwner.getWorkerInfo().getPort())) {
- if (LOG.isInfoEnabled()) {
- LOG.info("fixPartitionIdToSocketAddrMap: " +
- "Partition owner " +
- partitionOwner + " changed from " +
- address);
- }
- partitionIndexAddressMap.remove(
- partitionOwner.getPartitionId());
- }
-
+ public void openConnections() {
+ List<WorkerInfo> addresses = Lists.newArrayListWithCapacity(
+ service.getWorkerInfoList().size());
+ for (WorkerInfo info : service.getWorkerInfoList()) {
// No need to connect to myself
- if (service.getWorkerInfo().getTaskId() !=
- partitionOwner.getWorkerInfo().getTaskId()) {
- addressTaskIdMap.put(
- getInetSocketAddress(partitionOwner.getWorkerInfo(),
- partitionOwner.getPartitionId()),
- partitionOwner.getWorkerInfo().getTaskId());
+ if (service.getWorkerInfo().getTaskId() != info.getTaskId()) {
+ addresses.add(info);
}
}
-
- addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
- null);
- nettyClient.connectAllAddresses(addressTaskIdMap);
- }
-
- @Override
- public InetSocketAddress getInetSocketAddress(WorkerInfo workerInfo,
- int partitionId) {
- InetSocketAddress address = partitionIndexAddressMap.get(partitionId);
- if (address == null) {
- address = resolveAddress(maxResolveAddressAttempts,
- workerInfo.getInetSocketAddress());
- if (partitionId != NO_PARTITION_ID) {
- // Only cache valid partition ids
- partitionIndexAddressMap.put(partitionId, address);
- }
- }
-
- return address;
- }
-
- /**
- * Utility method for getInetSocketAddress()
- *
- * @param maxResolveAddressAttempts Maximum number of attempts to resolve the
- * address
- * @param address the address we are attempting to resolve
- * @return the successfully resolved address.
- * @throws IllegalStateException if the address is not resolved
- * in <code>maxResolveAddressAttempts</code> tries.
- */
- private static InetSocketAddress resolveAddress(
- int maxResolveAddressAttempts, InetSocketAddress address) {
- int resolveAttempts = 0;
- while (address.isUnresolved() &&
- resolveAttempts < maxResolveAddressAttempts) {
- ++resolveAttempts;
- LOG.warn("resolveAddress: Failed to resolve " + address +
- " on attempt " + resolveAttempts + " of " +
- maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- LOG.warn("resolveAddress: Interrupted.", e);
- }
- address = new InetSocketAddress(address.getHostName(),
- address.getPort());
- }
- if (resolveAttempts >= maxResolveAddressAttempts) {
- throw new IllegalStateException("resolveAddress: Couldn't " +
- "resolve " + address + " in " + resolveAttempts + " tries.");
- }
- return address;
+ addresses.add(service.getMasterInfo());
+ nettyClient.connectAllAddresses(addresses);
}
@Override
@@ -193,9 +97,8 @@ public class NettyWorkerClient<I extends
@Override
public void sendWritableRequest(Integer destWorkerId,
- InetSocketAddress remoteServer,
WritableRequest request) {
- nettyClient.sendWritableRequest(destWorkerId, remoteServer, request);
+ nettyClient.sendWritableRequest(destWorkerId, request);
}
@Override
@@ -216,7 +119,7 @@ public class NettyWorkerClient<I extends
else[HADOOP_NON_SECURE]*/
@Override
public void setup(boolean authenticate) {
- openConnections(service.getPartitionOwners());
+ openConnections();
if (authenticate) {
authenticate();
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java Wed Oct 17 22:05:47 2012
@@ -19,9 +19,7 @@ package org.apache.giraph.comm.netty;
import com.google.common.collect.Maps;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.Collection;
-import java.util.Iterator;
import java.util.Map;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
@@ -137,11 +135,9 @@ public class NettyWorkerClientRequestPro
if (workerMessageCount >= maxMessagesPerWorker) {
Map<Integer, Map<I, Collection<M>>> workerMessages =
sendMessageCache.removeWorkerMessages(workerInfo);
- InetSocketAddress remoteWorkerAddress =
- workerClient.getInetSocketAddress(workerInfo, partitionId);
WritableRequest writableRequest =
new SendWorkerMessagesRequest<I, V, E, M>(workerMessages);
- doRequest(workerInfo, remoteWorkerAddress, writableRequest);
+ doRequest(workerInfo, writableRequest);
}
}
@@ -149,18 +145,15 @@ public class NettyWorkerClientRequestPro
public void sendPartitionRequest(WorkerInfo workerInfo,
Partition<I, V, E, M> partition) {
final int partitionId = partition.getId();
- InetSocketAddress remoteServerAddress =
- workerClient.getInetSocketAddress(workerInfo, partitionId);
if (LOG.isTraceEnabled()) {
- LOG.trace("sendVertexRequest: Sending to " +
- remoteServerAddress +
- " from " + workerInfo + ", with partition " + partition);
+ LOG.trace("sendVertexRequest: Sending to " + workerInfo +
+ ", with partition " + partition);
}
WritableRequest vertexRequest =
new SendVertexRequest<I, V, E, M>(partitionId,
partition.getVertices());
- doRequest(workerInfo, remoteServerAddress, vertexRequest);
+ doRequest(workerInfo, vertexRequest);
// Messages are stored separately
MessageStoreByPartition<I, M> messageStore =
@@ -180,7 +173,7 @@ public class NettyWorkerClientRequestPro
if (messagesInMap > maxMessagesPerWorker) {
WritableRequest messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- doRequest(workerInfo, remoteServerAddress, messagesRequest);
+ doRequest(workerInfo, messagesRequest);
map.clear();
messagesInMap = 0;
}
@@ -188,7 +181,7 @@ public class NettyWorkerClientRequestPro
if (!map.isEmpty()) {
WritableRequest messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- doRequest(workerInfo, remoteServerAddress, messagesRequest);
+ doRequest(workerInfo, messagesRequest);
}
}
@@ -236,16 +229,12 @@ public class NettyWorkerClientRequestPro
int partitionMutationCount) {
// Send a request if enough mutations are there for a partition
if (partitionMutationCount >= maxMutationsPerPartition) {
- InetSocketAddress remoteServerAddress =
- workerClient.getInetSocketAddress(
- partitionOwner.getWorkerInfo(), partitionId);
Map<I, VertexMutations<I, V, E, M>> partitionMutations =
sendMutationsCache.removePartitionMutations(partitionId);
WritableRequest writableRequest =
new SendPartitionMutationsRequest<I, V, E, M>(
partitionId, partitionMutations);
- doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
- writableRequest);
+ doRequest(partitionOwner.getWorkerInfo(), writableRequest);
}
}
@@ -320,15 +309,9 @@ public class NettyWorkerClientRequestPro
remainingMessageCache = sendMessageCache.removeAllMessages();
for (Map.Entry<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> entry :
remainingMessageCache.entrySet()) {
- Iterator<Integer> cachedPartitionId =
- entry.getValue().keySet().iterator();
- final int partitionId = cachedPartitionId.hasNext() ?
- cachedPartitionId.next() : NettyWorkerClient.NO_PARTITION_ID;
- InetSocketAddress remoteWorkerAddress =
- workerClient.getInetSocketAddress(entry.getKey(), partitionId);
WritableRequest writableRequest =
new SendWorkerMessagesRequest<I, V, E, M>(entry.getValue());
- doRequest(entry.getKey(), remoteWorkerAddress, writableRequest);
+ doRequest(entry.getKey(), writableRequest);
}
// Execute the remaining sends mutations (if any)
@@ -342,11 +325,7 @@ public class NettyWorkerClientRequestPro
PartitionOwner partitionOwner =
serviceWorker.getVertexPartitionOwner(
entry.getValue().keySet().iterator().next());
- InetSocketAddress remoteServerAddress =
- workerClient.getInetSocketAddress(partitionOwner.getWorkerInfo(),
- partitionOwner.getPartitionId());
- doRequest(partitionOwner.getWorkerInfo(), remoteServerAddress,
- writableRequest);
+ doRequest(partitionOwner.getWorkerInfo(), writableRequest);
}
}
@@ -366,11 +345,9 @@ public class NettyWorkerClientRequestPro
* When doing the request, short circuit if it is local
*
* @param workerInfo Worker info
- * @param remoteServerAddress Remote server address
* @param writableRequest Request to either submit or run locally
*/
private void doRequest(WorkerInfo workerInfo,
- InetSocketAddress remoteServerAddress,
WritableRequest writableRequest) {
// If this is local, execute locally
if (serviceWorker.getWorkerInfo().getTaskId() ==
@@ -378,7 +355,7 @@ public class NettyWorkerClientRequestPro
((WorkerRequest) writableRequest).doRequest(serverData);
} else {
workerClient.sendWritableRequest(
- workerInfo.getTaskId(), remoteServerAddress, writableRequest);
+ workerInfo.getTaskId(), writableRequest);
}
}
}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java?rev=1399453&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java Wed Oct 17 22:05:47 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Helper class to write descriptions of master, workers and partition owners
+ */
+public class AddressesAndPartitionsWritable implements Writable {
+ /** Master information */
+ private WorkerInfo masterInfo;
+ /** List of all workers */
+ private List<WorkerInfo> workerInfos;
+ /** Collection of partitions */
+ private Collection<PartitionOwner> partitionOwners;
+ /** Partition owner class, used to deserialize object */
+ private Class<? extends PartitionOwner> partitionOwnerClass;
+
+ /**
+ * Constructor when we want to serialize object
+ *
+ * @param masterInfo Master information
+ * @param workerInfos List of all workers
+ * @param partitionOwners Collection of partitions
+ */
+ public AddressesAndPartitionsWritable(WorkerInfo masterInfo,
+ List<WorkerInfo> workerInfos,
+ Collection<PartitionOwner> partitionOwners) {
+ this.masterInfo = masterInfo;
+ this.workerInfos = workerInfos;
+ this.partitionOwners = partitionOwners;
+ }
+
+ /**
+ * Constructor when we want to deserialize object
+ *
+ * @param partitionOwnerClass Partition owner class
+ */
+ public AddressesAndPartitionsWritable(
+ Class<? extends PartitionOwner> partitionOwnerClass) {
+ this.partitionOwnerClass = partitionOwnerClass;
+ }
+
+ /**
+ * Get master information
+ *
+ * @return Master information
+ */
+ public WorkerInfo getMasterInfo() {
+ return masterInfo;
+ }
+
+ /**
+ * Get all workers
+ *
+ * @return List of all workers
+ */
+ public List<WorkerInfo> getWorkerInfos() {
+ return workerInfos;
+ }
+
+ /**
+ * Get partition owners
+ *
+ * @return Collection of partition owners
+ */
+ public Collection<PartitionOwner> getPartitionOwners() {
+ return partitionOwners;
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ masterInfo.write(output);
+
+ output.writeInt(workerInfos.size());
+ for (WorkerInfo workerInfo : workerInfos) {
+ workerInfo.write(output);
+ }
+
+ output.writeInt(partitionOwners.size());
+ for (PartitionOwner partitionOwner : partitionOwners) {
+ partitionOwner.write(output);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ masterInfo = new WorkerInfo();
+ masterInfo.readFields(input);
+
+ int workerInfosSize = input.readInt();
+ workerInfos = Lists.newArrayListWithCapacity(workerInfosSize);
+ for (int i = 0; i < workerInfosSize; i++) {
+ WorkerInfo workerInfo = new WorkerInfo();
+ workerInfo.readFields(input);
+ workerInfos.add(workerInfo);
+ }
+
+ int partitionOwnersSize = input.readInt();
+ partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
+ for (int i = 0; i < partitionOwnersSize; i++) {
+ try {
+ PartitionOwner partitionOwner = partitionOwnerClass.newInstance();
+ partitionOwner.readFields(input);
+ partitionOwners.add(partitionOwner);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("readFields: " +
+ "InstantiationException on partition owner class " +
+ partitionOwnerClass, e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("readFields: " +
+ "IllegalAccessException on partition owner class " +
+ partitionOwnerClass, e);
+ }
+ }
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Wed Oct 17 22:05:47 2012
@@ -93,8 +93,6 @@ public abstract class BspService<I exten
"/_applicationAttemptsDir";
/** Where the master election happens */
public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
- /** Denotes current master's info */
- public static final String CURRENT_MASTER_INFO_NODE = "/_currentMaster";
/** Superstep scope */
public static final String SUPERSTEP_DIR = "/_superstepDir";
/** Where the merged aggregators are located */
@@ -109,9 +107,9 @@ public abstract class BspService<I exten
"/_workerWroteCheckpointDir";
/** Finished workers notify here */
public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
- /** Where the partition assignments are set */
- public static final String PARTITION_ASSIGNMENTS_DIR =
- "/_partitionAssignments";
+ /** Where the master and worker addresses and partition assignments are set */
+ public static final String ADDRESSES_AND_PARTITIONS_DIR =
+ "/_addressesAndPartitions";
/** Helps coordinate the partition exchnages */
public static final String PARTITION_EXCHANGE_DIR =
"/_partitionExchangeDir";
@@ -202,8 +200,6 @@ public abstract class BspService<I exten
protected final String checkpointBasePath;
/** Path to the master election path */
protected final String masterElectionPath;
- /** Path to current master info */
- protected final String currentMasterPath;
/** Private ZooKeeper instance that implements the service */
private final ZooKeeperExt zk;
/** Has the Connection occurred? */
@@ -218,8 +214,8 @@ public abstract class BspService<I exten
private final BspEvent inputSplitsAllDoneChanged;
/** InputSplit done by a worker finished notification and synchronization */
private final BspEvent inputSplitsDoneStateChanged;
- /** Are the partition assignments to workers ready? */
- private final BspEvent partitionAssignmentsReadyChanged;
+ /** Are the addresses and partition assignments to workers ready? */
+ private final BspEvent addressesAndPartitionsReadyChanged;
/** Application attempt changed */
private final BspEvent applicationAttemptChanged;
/** Superstep finished synchronization */
@@ -276,7 +272,7 @@ public abstract class BspService<I exten
this.inputSplitsStateChanged = new PredicateLock(context);
this.inputSplitsAllDoneChanged = new PredicateLock(context);
this.inputSplitsDoneStateChanged = new PredicateLock(context);
- this.partitionAssignmentsReadyChanged = new PredicateLock(context);
+ this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
this.applicationAttemptChanged = new PredicateLock(context);
this.superstepFinished = new PredicateLock(context);
this.masterElectionChildrenChanged = new PredicateLock(context);
@@ -286,7 +282,7 @@ public abstract class BspService<I exten
registerBspEvent(workerHealthRegistrationChanged);
registerBspEvent(inputSplitsAllReadyChanged);
registerBspEvent(inputSplitsStateChanged);
- registerBspEvent(partitionAssignmentsReadyChanged);
+ registerBspEvent(addressesAndPartitionsReadyChanged);
registerBspEvent(applicationAttemptChanged);
registerBspEvent(superstepFinished);
registerBspEvent(masterElectionChildrenChanged);
@@ -332,7 +328,6 @@ public abstract class BspService<I exten
GiraphConfiguration.CHECKPOINT_DIRECTORY,
GiraphConfiguration.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
- currentMasterPath = basePath + CURRENT_MASTER_INFO_NODE;
if (LOG.isInfoEnabled()) {
LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
", " + getTaskPartition() + " on " + serverPortList);
@@ -460,16 +455,16 @@ public abstract class BspService<I exten
}
/**
- * Generate the "partiton assignments" directory path for a superstep
+ * Generate the "addresses and partitions" directory path for a superstep
*
* @param attempt application attempt number
* @param superstep superstep to use
* @return directory path based on the a superstep
*/
- public final String getPartitionAssignmentsPath(long attempt,
+ public final String getAddressesAndPartitionsPath(long attempt,
long superstep) {
return applicationAttemptsPath + "/" + attempt +
- SUPERSTEP_DIR + "/" + superstep + PARTITION_ASSIGNMENTS_DIR;
+ SUPERSTEP_DIR + "/" + superstep + ADDRESSES_AND_PARTITIONS_DIR;
}
/**
@@ -656,8 +651,8 @@ public abstract class BspService<I exten
return inputSplitsDoneStateChanged;
}
- public final BspEvent getPartitionAssignmentsReadyChangedEvent() {
- return partitionAssignmentsReadyChanged;
+ public final BspEvent getAddressesAndPartitionsReadyChangedEvent() {
+ return addressesAndPartitionsReadyChanged;
}
@@ -1014,13 +1009,13 @@ public abstract class BspService<I exten
}
inputSplitsAllDoneChanged.signal();
eventProcessed = true;
- } else if (event.getPath().contains(PARTITION_ASSIGNMENTS_DIR) &&
+ } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
event.getType() == EventType.NodeCreated) {
if (LOG.isInfoEnabled()) {
LOG.info("process: partitionAssignmentsReadyChanged " +
"(partitions are assigned)");
}
- partitionAssignmentsReadyChanged.signal();
+ addressesAndPartitionsReadyChanged.signal();
eventProcessed = true;
} else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
event.getType() == EventType.NodeCreated) {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Oct 17 22:05:47 2012
@@ -55,6 +55,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
import org.json.JSONException;
import org.json.JSONObject;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import net.iharder.Base64;
@@ -149,6 +150,8 @@ public class BspServiceMaster<I extends
private MasterClientServer commService;
/** Master info */
private WorkerInfo masterInfo;
+ /** List of workers in current superstep */
+ private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
/** Limit locality information added to each InputSplit znode */
private final int localityLimit = 5;
@@ -602,6 +605,11 @@ public class BspServiceMaster<I extends
}
@Override
+ public List<WorkerInfo> getWorkerInfoList() {
+ return chosenWorkerInfoList;
+ }
+
+ @Override
public MasterAggregatorUsage getAggregatorUsage() {
return aggregatorHandler;
}
@@ -763,12 +771,9 @@ public class BspServiceMaster<I extends
aggregatorHandler.initialize(this);
commService = new NettyMasterClientServer(
- getContext(), getConfiguration());
+ getContext(), getConfiguration(), this);
masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
commService.getMyAddress().getPort());
- // write my address to znode so workers could read it
- WritableUtils.writeToZnode(getZkExt(), currentMasterPath, -1,
- masterInfo);
if (LOG.isInfoEnabled()) {
LOG.info("becomeMaster: I am now the master!");
@@ -796,7 +801,7 @@ public class BspServiceMaster<I extends
* @return Global statistics aggregated on all worker statistics
*/
private GlobalStats aggregateWorkerStats(long superstep) {
- Class<? extends Writable> partitionStatsClass =
+ Class<? extends PartitionStats> partitionStatsClass =
masterGraphPartitioner.createPartitionStats().getClass();
GlobalStats globalStats = new GlobalStats();
// Get the stats from the all the worker selected nodes
@@ -822,15 +827,15 @@ public class BspServiceMaster<I extends
byte [] zkData =
getZkExt().getData(finishedPath, false, null);
workerFinishedInfoObj = new JSONObject(new String(zkData));
- List<? extends Writable> writableList =
+ List<PartitionStats> statsList =
WritableUtils.readListFieldsFromByteArray(
Base64.decode(workerFinishedInfoObj.getString(
JSONOBJ_PARTITION_STATS_KEY)),
partitionStatsClass,
getConfiguration());
- for (Writable writable : writableList) {
- globalStats.addPartitionStats((PartitionStats) writable);
- allPartitionStatsList.add((PartitionStats) writable);
+ for (PartitionStats partitionStats : statsList) {
+ globalStats.addPartitionStats(partitionStats);
+ allPartitionStatsList.add(partitionStats);
}
globalStats.addMessageCount(
workerFinishedInfoObj.getLong(
@@ -986,14 +991,17 @@ public class BspServiceMaster<I extends
}
// Workers are waiting for these assignments
- String partitionAssignmentsPath =
- getPartitionAssignmentsPath(getApplicationAttempt(),
+ AddressesAndPartitionsWritable addressesAndPartitions =
+ new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
+ partitionOwners);
+ String addressesAndPartitionsPath =
+ getAddressesAndPartitionsPath(getApplicationAttempt(),
getSuperstep());
- WritableUtils.writeListToZnode(
+ WritableUtils.writeToZnode(
getZkExt(),
- partitionAssignmentsPath,
+ addressesAndPartitionsPath,
-1,
- new ArrayList<Writable>(partitionOwners));
+ addressesAndPartitions);
}
/**
@@ -1252,7 +1260,7 @@ public class BspServiceMaster<I extends
// 5. Create superstep finished node
// 6. If the checkpoint frequency is met, finalize the checkpoint
- List<WorkerInfo> chosenWorkerInfoList = checkWorkers();
+ chosenWorkerInfoList = checkWorkers();
if (chosenWorkerInfoList == null) {
LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
"superstep " + getSuperstep());
@@ -1271,7 +1279,7 @@ public class BspServiceMaster<I extends
}
}
- commService.fixWorkerAddresses(chosenWorkerInfoList);
+ commService.openConnections();
currentWorkersCounter.increment(chosenWorkerInfoList.size() -
currentWorkersCounter.getValue());
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 17 22:05:47 2012
@@ -107,17 +107,13 @@ public class BspServiceWorker<I extends
private final WorkerServer<I, V, E, M> workerServer;
/** Master info */
private WorkerInfo masterInfo = new WorkerInfo();
+ /** List of workers */
+ private List<WorkerInfo> workerInfoList = Lists.newArrayList();
/** Have the partition exchange children (workers) changed? */
private final BspEvent partitionExchangeChildrenChanged;
/** Worker Context */
private final WorkerContext workerContext;
- /** Total vertices loaded */
- private long totalVerticesLoaded = 0;
- /** Total edges loaded */
- private long totalEdgesLoaded = 0;
- /** Input split max vertices (-1 denotes all) */
- private final long inputSplitMaxVertices;
/**
* Stores and processes the list of InputSplits advertised
* in a tree of child znodes by the master.
@@ -146,7 +142,6 @@ public class BspServiceWorker<I extends
super(serverPortList, sessionMsecTimeout, context, graphMapper);
partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
- inputSplitMaxVertices = getConfiguration().getInputSplitMaxVertices();
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
workerServer = new NettyWorkerServer<I, V, E, M>(getConfiguration(),
@@ -342,6 +337,11 @@ public class BspServiceWorker<I extends
}
@Override
+ public List<WorkerInfo> getWorkerInfoList() {
+ return workerInfoList;
+ }
+
+ @Override
public FinishedSuperstepStats setup() {
// Unless doing a restart, prepare for computation:
// 1. Start superstep INPUT_SUPERSTEP (no computation)
@@ -603,29 +603,24 @@ else[HADOOP_NON_SECURE]*/
registerHealth(getSuperstep());
- String partitionAssignmentsNode =
- getPartitionAssignmentsPath(getApplicationAttempt(),
+ String addressesAndPartitionsPath =
+ getAddressesAndPartitionsPath(getApplicationAttempt(),
getSuperstep());
- Collection<? extends PartitionOwner> masterSetPartitionOwners;
+ AddressesAndPartitionsWritable addressesAndPartitions =
+ new AddressesAndPartitionsWritable(
+ workerGraphPartitioner.createPartitionOwner().getClass());
try {
- while (getZkExt().exists(partitionAssignmentsNode, true) ==
+ while (getZkExt().exists(addressesAndPartitionsPath, true) ==
null) {
- getPartitionAssignmentsReadyChangedEvent().waitForever();
- getPartitionAssignmentsReadyChangedEvent().reset();
+ getAddressesAndPartitionsReadyChangedEvent().waitForever();
+ getAddressesAndPartitionsReadyChangedEvent().reset();
}
- List<? extends Writable> writableList =
- WritableUtils.readListFieldsFromZnode(
- getZkExt(),
- partitionAssignmentsNode,
- false,
- null,
- workerGraphPartitioner.createPartitionOwner().getClass(),
- getConfiguration());
-
- @SuppressWarnings("unchecked")
- Collection<? extends PartitionOwner> castedWritableList =
- (Collection<? extends PartitionOwner>) writableList;
- masterSetPartitionOwners = castedWritableList;
+ WritableUtils.readFieldsFromZnode(
+ getZkExt(),
+ addressesAndPartitionsPath,
+ false,
+ null,
+ addressesAndPartitions);
} catch (KeeperException e) {
throw new IllegalStateException(
"startSuperstep: KeeperException getting assignments", e);
@@ -634,15 +629,15 @@ else[HADOOP_NON_SECURE]*/
"startSuperstep: InterruptedException getting assignments", e);
}
- // get address of master
- WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
- null, masterInfo);
+ workerInfoList.clear();
+ workerInfoList = addressesAndPartitions.getWorkerInfos();
+ masterInfo = addressesAndPartitions.getMasterInfo();
if (LOG.isInfoEnabled()) {
LOG.info("startSuperstep: Ready for computation on superstep " +
getSuperstep() + " since worker " +
"selection and vertex range assignments are done in " +
- partitionAssignmentsNode);
+ addressesAndPartitionsPath);
}
if (getSuperstep() != INPUT_SUPERSTEP) {
@@ -652,7 +647,7 @@ else[HADOOP_NON_SECURE]*/
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep());
- return masterSetPartitionOwners;
+ return addressesAndPartitions.getPartitionOwners();
}
@Override
@@ -1151,7 +1146,7 @@ else[HADOOP_NON_SECURE]*/
PartitionExchange partitionExchange =
workerGraphPartitioner.updatePartitionOwners(
getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
- workerClient.openConnections(getPartitionOwners());
+ workerClient.openConnections();
Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
partitionExchange.getSendWorkerPartitionMap();
@@ -1263,11 +1258,6 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return workerGraphPartitioner.getPartitionOwners();
- }
-
- @Override
public PartitionOwner getVertexPartitionOwner(I vertexId) {
return workerGraphPartitioner.getPartitionOwner(vertexId);
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Wed Oct 17 22:05:47 2012
@@ -55,7 +55,7 @@ import org.apache.zookeeper.data.Stat;
*/
public class InputSplitsCallable<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements Callable {
+ implements Callable<VertexEdgeCount> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(InputSplitsCallable.class);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java Wed Oct 17 22:05:47 2012
@@ -58,6 +58,16 @@ public class WorkerInfo implements Writa
this.hostnameId = hostname + "_" + taskId;
}
+ /**
+ * Constructor with InetSocketAddress
+ *
+ * @param address Address of this worker
+ * @param taskId The task partition for this worker
+ */
+ public WorkerInfo(InetSocketAddress address, int taskId) {
+ this(address.getHostName(), taskId, address.getPort());
+ }
+
public String getHostname() {
return hostname;
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/WritableUtils.java Wed Oct 17 22:05:47 2012
@@ -193,24 +193,25 @@ public class WritableUtils {
}
/**
- * Read fields from byteArray to a list of Writable objects.
+ * Read fields from byteArray to a list of objects.
*
* @param byteArray Byte array to find the fields in.
* @param writableClass Class of the objects to instantiate.
* @param conf Configuration used for instantiation (i.e Configurable)
- * @return List of writable objects.
+ * @param <T> Object type
+ * @return List of objects.
*/
- public static List<? extends Writable> readListFieldsFromByteArray(
+ public static <T extends Writable> List<T> readListFieldsFromByteArray(
byte[] byteArray,
- Class<? extends Writable> writableClass,
+ Class<? extends T> writableClass,
Configuration conf) {
try {
DataInputStream inputStream =
new DataInputStream(new ByteArrayInputStream(byteArray));
int size = inputStream.readInt();
- List<Writable> writableList = new ArrayList<Writable>(size);
+ List<T> writableList = new ArrayList<T>(size);
for (int i = 0; i < size; ++i) {
- Writable writable =
+ T writable =
ReflectionUtils.newInstance(writableClass, conf);
writable.readFields(inputStream);
writableList.add(writable);
@@ -231,18 +232,20 @@ public class WritableUtils {
* @param stat Stat of znode if desired.
* @param writableClass Class of the objects to instantiate.
* @param conf Configuration used for instantiation (i.e Configurable)
- * @return List of writable objects.
+ * @param <T> Object type
+ * @return List of objects.
*/
- public static List<? extends Writable> readListFieldsFromZnode(
+ public static <T extends Writable> List<T> readListFieldsFromZnode(
ZooKeeperExt zkExt,
String zkPath,
boolean watch,
Stat stat,
- Class<? extends Writable> writableClass,
+ Class<? extends T> writableClass,
Configuration conf) {
try {
byte[] zkData = zkExt.getData(zkPath, false, stat);
- return readListFieldsFromByteArray(zkData, writableClass, conf);
+ return WritableUtils.<T>readListFieldsFromByteArray(zkData,
+ writableClass, conf);
} catch (KeeperException e) {
throw new IllegalStateException(
"readListFieldsFromZnode: KeeperException on " + zkPath, e);
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java Wed Oct 17 22:05:47 2012
@@ -18,8 +18,9 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Maps;
-import java.util.Map;
+import com.google.common.collect.Lists;
+
+import java.util.List;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -28,6 +29,7 @@ import org.apache.giraph.comm.netty.Nett
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -38,8 +40,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
/**
* Test the netty connections
@@ -85,9 +85,9 @@ public class ConnectionTest {
server.start();
NettyClient client = new NettyClient(context, conf);
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server.getMyAddress(), -1);
- client.connectAllAddresses(addressIdMap);
+ client.connectAllAddresses(
+ Lists.<WorkerInfo>newArrayList(
+ new WorkerInfo(server.getMyAddress(), -1)));
client.stop();
server.stop();
@@ -121,11 +121,12 @@ public class ConnectionTest {
server3.start();
NettyClient client = new NettyClient(context, conf);
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server1.getMyAddress(), -1);
- addressIdMap.put(server2.getMyAddress(), -1);
- addressIdMap.put(server3.getMyAddress(), -1);
- client.connectAllAddresses(addressIdMap);
+ WorkerInfo workerInfo1 = new WorkerInfo(server1.getMyAddress(), 1);
+ WorkerInfo workerInfo2 = new WorkerInfo(server2.getMyAddress(), 2);
+ WorkerInfo workerInfo3 = new WorkerInfo(server3.getMyAddress(), 3);
+ List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
+ workerInfo2, workerInfo3);
+ client.connectAllAddresses(addresses);
client.stop();
server1.stop();
@@ -154,14 +155,15 @@ public class ConnectionTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server.getMyAddress(), -1);
+ List<WorkerInfo> addresses =
+ Lists.<WorkerInfo>newArrayList(
+ new WorkerInfo(server.getMyAddress(), -1));
NettyClient client1 = new NettyClient(context, conf);
- client1.connectAllAddresses(addressIdMap);
+ client1.connectAllAddresses(addresses);
NettyClient client2 = new NettyClient(context, conf);
- client2.connectAllAddresses(addressIdMap);
+ client2.connectAllAddresses(addresses);
NettyClient client3 = new NettyClient(context, conf);
- client3.connectAllAddresses(addressIdMap);
+ client3.connectAllAddresses(addresses);
client1.stop();
client2.stop();
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Wed Oct 17 22:05:47 2012
@@ -18,7 +18,6 @@
package org.apache.giraph.comm;
-import java.net.InetSocketAddress;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -28,6 +27,7 @@ import org.apache.giraph.comm.netty.hand
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -43,7 +43,6 @@ import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
/**
@@ -141,15 +140,15 @@ public class RequestFailureTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server.getMyAddress(), -1);
- client.connectAllAddresses(addressIdMap);
+ client.connectAllAddresses(
+ Lists.<WorkerInfo>newArrayList(
+ new WorkerInfo(server.getMyAddress(), -1)));
// Send the request 2x
WritableRequest request1 = getRequest();
WritableRequest request2 = getRequest();
- client.sendWritableRequest(-1, server.getMyAddress(), request1);
- client.sendWritableRequest(-1, server.getMyAddress(), request2);
+ client.sendWritableRequest(-1, request1);
+ client.sendWritableRequest(-1, request2);
client.waitAllRequests();
// Stop the service
@@ -179,15 +178,15 @@ public class RequestFailureTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server.getMyAddress(), -1);
- client.connectAllAddresses(addressIdMap);
+ client.connectAllAddresses(
+ Lists.<WorkerInfo>newArrayList(
+ new WorkerInfo(server.getMyAddress(), -1)));
// Send the request 2x, but should only be processed once
WritableRequest request1 = getRequest();
WritableRequest request2 = getRequest();
- client.sendWritableRequest(-1, server.getMyAddress(), request1);
- client.sendWritableRequest(-1, server.getMyAddress(), request2);
+ client.sendWritableRequest(-1, request1);
+ client.sendWritableRequest(-1, request2);
client.waitAllRequests();
// Stop the service
@@ -216,15 +215,15 @@ public class RequestFailureTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server.getMyAddress(), -1);
- client.connectAllAddresses(addressIdMap);
+ client.connectAllAddresses(
+ Lists.<WorkerInfo>newArrayList(
+ new WorkerInfo(server.getMyAddress(), -1)));
// Send the request 2x, but should only be processed once
WritableRequest request1 = getRequest();
WritableRequest request2 = getRequest();
- client.sendWritableRequest(-1, server.getMyAddress(), request1);
- client.sendWritableRequest(-1, server.getMyAddress(), request2);
+ client.sendWritableRequest(-1, request1);
+ client.sendWritableRequest(-1, request2);
client.waitAllRequests();
// Stop the service
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Wed Oct 17 22:05:47 2012
@@ -18,7 +18,6 @@
package org.apache.giraph.comm;
-import java.net.InetSocketAddress;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -32,6 +31,7 @@ import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.PartitionStore;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.io.IntWritable;
@@ -50,7 +50,6 @@ import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -102,9 +101,9 @@ public class RequestTest {
new WorkerRequestServerHandler.Factory(serverData));
server.start();
client = new NettyClient(context, conf);
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server.getMyAddress(), -1);
- client.connectAllAddresses(addressIdMap);
+ client.connectAllAddresses(
+ Lists.<WorkerInfo>newArrayList(
+ new WorkerInfo(server.getMyAddress(), -1)));
}
@Test
@@ -126,7 +125,7 @@ public class RequestTest {
IntWritable> request =
new SendVertexRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(partitionId, vertices);
- client.sendWritableRequest(-1, server.getMyAddress(), request);
+ client.sendWritableRequest(-1, request);
client.waitAllRequests();
// Stop the service
@@ -170,7 +169,7 @@ public class RequestTest {
IntWritable> request =
new SendWorkerMessagesRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(sendMap);
- client.sendWritableRequest(-1, server.getMyAddress(), request);
+ client.sendWritableRequest(-1, request);
client.waitAllRequests();
// Stop the service
@@ -233,7 +232,7 @@ public class RequestTest {
IntWritable> request =
new SendPartitionMutationsRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(partitionId, vertexIdMutations);
- client.sendWritableRequest(-1, server.getMyAddress(), request);
+ client.sendWritableRequest(-1, request);
client.waitAllRequests();
// Stop the service
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1399453&r1=1399452&r2=1399453&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Wed Oct 17 22:05:47 2012
@@ -18,9 +18,7 @@
package org.apache.giraph.comm;
-import com.google.common.collect.Maps;
-import java.net.InetSocketAddress;
-import java.util.Map;
+import com.google.common.collect.Lists;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.messages.SimpleMessageStore;
@@ -29,6 +27,7 @@ import org.apache.giraph.comm.netty.Nett
import org.apache.giraph.comm.netty.handler.SaslServerHandler;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -96,9 +95,9 @@ public class SaslConnectionTest {
server.start();
NettyClient client = new NettyClient(context, conf);
- Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap();
- addressIdMap.put(server.getMyAddress(), -1);
- client.connectAllAddresses(addressIdMap);
+ client.connectAllAddresses(
+ Lists.<WorkerInfo>newArrayList(
+ new WorkerInfo(server.getMyAddress(), -1)));
client.stop();
server.stop();