You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2012/11/13 21:03:11 UTC
svn commit: r1408926 - in /giraph/trunk: ./
giraph/src/main/java/org/apache/giraph/bsp/
giraph/src/main/java/org/apache/giraph/comm/
giraph/src/main/java/org/apache/giraph/comm/netty/
giraph/src/main/java/org/apache/giraph/comm/netty/handler/ giraph/sr...
Author: maja
Date: Tue Nov 13 20:03:07 2012
New Revision: 1408926
URL: http://svn.apache.org/viewvc?rev=1408926&view=rev
Log:
GIRAPH-386: ClassCastException when giraph.SplitMasterWorker=false
Added:
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java
Removed:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClientServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Nov 13 20:03:07 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-386: ClassCastException when giraph.SplitMasterWorker=false
+ (majakabiljo)
+
GIRAPH-423: Allow overriding addEdge (apresta)
GIRAPH-422: Setting the log level of the root logger to the same level
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Tue Nov 13 20:03:07 2012
@@ -21,6 +21,7 @@ package org.apache.giraph.bsp;
import java.io.IOException;
import org.apache.giraph.graph.MasterAggregatorHandler;
+import org.apache.giraph.graph.MasterInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.zookeeper.KeeperException;
@@ -50,6 +51,13 @@ public interface CentralizedServiceMaste
boolean becomeMaster();
/**
+ * Get master information
+ *
+ * @return Master information
+ */
+ MasterInfo getMasterInfo();
+
+ /**
* Create the {@link InputSplit} objects from the index range based on the
* user-defined VertexInputFormat. The {@link InputSplit} objects will
* processed by the workers later on during the INPUT_SUPERSTEP.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Tue Nov 13 20:03:07 2012
@@ -26,6 +26,7 @@ import org.apache.giraph.comm.ServerData
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.MasterInfo;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.graph.WorkerAggregatorHandler;
import org.apache.giraph.graph.partition.PartitionStore;
@@ -197,7 +198,7 @@ public interface CentralizedServiceWorke
*
* @return Master info
*/
- WorkerInfo getMasterInfo();
+ MasterInfo getMasterInfo();
/**
* Get the GraphMapper that this service is using. Vertices need to know
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java Tue Nov 13 20:03:07 2012
@@ -71,10 +71,10 @@ else[HADOOP_NON_SECURE]*/
/**
* Send a request to a remote server (should be already connected)
*
- * @param destWorkerId Destination worker id
+ * @param destTaskId Destination worker id
* @param request Request to send
*/
- void sendWritableRequest(Integer destWorkerId, WritableRequest request);
+ void sendWritableRequest(Integer destTaskId, WritableRequest request);
/**
* Wait until all the outstanding requests are completed.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java Tue Nov 13 20:03:07 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.Closeable;
+import java.net.InetSocketAddress;
/**
* Interface for message communication server.
@@ -37,11 +38,11 @@ public interface WorkerServer<I extends
V extends Writable, E extends Writable, M extends Writable>
extends Closeable {
/**
- * Get the port
+ * Get server address
*
- * @return Port used by this server
+ * @return Address used by this server
*/
- int getPort();
+ InetSocketAddress getMyAddress();
/**
* Prepare incoming messages for computation, and resolve mutation requests.
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Tue Nov 13 20:03:07 2012
@@ -48,7 +48,7 @@ import org.apache.giraph.comm.requests.R
import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.WorkerInfo;
+import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
@@ -140,8 +140,8 @@ else[HADOOP_NON_SECURE]*/
/** Address request id generator */
private final AddressRequestIdGenerator addressRequestIdGenerator =
new AddressRequestIdGenerator();
- /** Client id */
- private final int clientId;
+ /** Task info */
+ private final TaskInfo myTaskInfo;
/** Maximum thread pool size */
private final int maxPoolSize;
/** Maximum number of attempts to resolve an address*/
@@ -156,10 +156,13 @@ else[HADOOP_NON_SECURE]*/
*
* @param context Context for progress
* @param conf Configuration
+ * @param myTaskInfo Current task info
*/
public NettyClient(Mapper<?, ?, ?, ?>.Context context,
- final ImmutableClassesGiraphConfiguration conf) {
+ final ImmutableClassesGiraphConfiguration conf,
+ TaskInfo myTaskInfo) {
this.context = context;
+ this.myTaskInfo = myTaskInfo;
this.channelsPerServer = conf.getInt(
GiraphConfiguration.CHANNELS_PER_SERVER,
GiraphConfiguration.DEFAULT_CHANNELS_PER_SERVER);
@@ -239,8 +242,6 @@ else[HADOOP_NON_SECURE]*/
new ThreadFactoryBuilder().setNameFormat(
"netty-client-worker-%d").build());
- clientId = conf.getInt("mapred.task.partition", -1);
-
// Configure the client.
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
@@ -353,10 +354,10 @@ else[HADOOP_NON_SECURE]*/
*
* @param tasks Tasks to connect to (if haven't already connected)
*/
- public void connectAllAddresses(Collection<WorkerInfo> tasks) {
+ public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {
List<ChannelFutureAddress> waitingConnectionList =
Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);
- for (WorkerInfo taskInfo : tasks) {
+ for (TaskInfo taskInfo : tasks) {
context.progress();
InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());
if (address == null ||
@@ -622,12 +623,12 @@ else[HADOOP_NON_SECURE]*/
/**
* Send a request to a remote server (should be already connected)
*
- * @param destWorkerId Destination worker id
+ * @param destTaskId Destination task id
* @param request Request to send
*/
- public void sendWritableRequest(Integer destWorkerId,
+ public void sendWritableRequest(Integer destTaskId,
WritableRequest request) {
- InetSocketAddress remoteServer = taskIdAddressMap.get(destWorkerId);
+ InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
if (clientRequestIdRequestInfoMap.isEmpty()) {
byteCounter.resetAll();
}
@@ -642,11 +643,11 @@ else[HADOOP_NON_SECURE]*/
Channel channel = getNextChannel(remoteServer);
RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
if (registerRequest) {
- request.setClientId(clientId);
+ request.setClientId(myTaskInfo.getTaskId());
request.setRequestId(
addressRequestIdGenerator.getNextRequestId(remoteServer));
ClientRequestId clientRequestId =
- new ClientRequestId(destWorkerId, request.getRequestId());
+ new ClientRequestId(destTaskId, request.getRequestId());
RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
clientRequestId, newRequestInfo);
if (oldRequestInfo != null) {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java Tue Nov 13 20:03:07 2012
@@ -58,7 +58,8 @@ public class NettyMasterClient implement
public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration configuration,
CentralizedServiceMaster<?, ?, ?, ?> service) {
- this.nettyClient = new NettyClient(context, configuration);
+ this.nettyClient =
+ new NettyClient(context, configuration, service.getMasterInfo());
this.service = service;
this.progressable = context;
maxBytesPerAggregatorRequest = configuration.getInt(
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java Tue Nov 13 20:03:07 2012
@@ -19,9 +19,9 @@
package org.apache.giraph.comm.netty;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
import org.apache.giraph.comm.MasterServer;
-import org.apache.giraph.graph.MasterAggregatorHandler;
import java.net.InetSocketAddress;
@@ -36,12 +36,13 @@ public class NettyMasterServer implement
* Constructor
*
* @param conf Hadoop configuration
- * @param aggregatorHandler Master aggregator handler
+ * @param service Centralized service
*/
public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
- MasterAggregatorHandler aggregatorHandler) {
+ CentralizedServiceMaster<?, ?, ?, ?> service) {
nettyServer = new NettyServer(conf,
- new MasterRequestServerHandler.Factory(aggregatorHandler));
+ new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
+ service.getMasterInfo());
nettyServer.start();
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Tue Nov 13 20:03:07 2012
@@ -39,6 +39,7 @@ import org.apache.giraph.comm.netty.hand
import java.util.concurrent.TimeUnit;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.TaskInfo;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@@ -87,6 +88,8 @@ else[HADOOP_NON_SECURE]*/
private final String localHostname;
/** Address of the server */
private InetSocketAddress myAddress;
+ /** Current task info */
+ private TaskInfo myTaskInfo;
/** Maximum number of threads */
private final int maxPoolSize;
/** TCP backlog */
@@ -122,15 +125,18 @@ else[HADOOP_NON_SECURE]*/
*
* @param conf Configuration to use
* @param requestServerHandlerFactory Factory for request handlers
+ * @param myTaskInfo Current task info
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
- RequestServerHandler.Factory requestServerHandlerFactory) {
+ RequestServerHandler.Factory requestServerHandlerFactory,
+ TaskInfo myTaskInfo) {
this.conf = conf;
this.requestServerHandlerFactory = requestServerHandlerFactory;
/*if[HADOOP_NON_SECURE]
else[HADOOP_NON_SECURE]*/
this.saslServerHandlerFactory = new SaslServerHandler.Factory();
/*end[HADOOP_NON_SECURE]*/
+ this.myTaskInfo = myTaskInfo;
sendBufferSize = conf.getInt(
GiraphConfiguration.SERVER_SEND_BUFFER_SIZE,
GiraphConfiguration.DEFAULT_SERVER_SEND_BUFFER_SIZE);
@@ -196,12 +202,14 @@ else[HADOOP_NON_SECURE]*/
*
* @param conf Configuration to use
* @param requestServerHandlerFactory Factory for request handlers
+ * @param myTaskInfo Current task info
* @param saslServerHandlerFactory Factory for SASL handlers
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory,
+ TaskInfo myTaskInfo,
SaslServerHandler.Factory saslServerHandlerFactory) {
- this(conf, requestServerHandlerFactory);
+ this(conf, requestServerHandlerFactory, myTaskInfo);
this.saslServerHandlerFactory = saslServerHandlerFactory;
}
/*end[HADOOP_NON_SECURE]*/
@@ -247,7 +255,7 @@ else[HADOOP_NON_SECURE]*/
saslServerHandlerFactory.newHandler(conf),
new AuthorizeServerHandler(),
requestServerHandlerFactory.newHandler(workerRequestReservedMap,
- conf),
+ conf, myTaskInfo),
// Removed after authentication completes:
new ResponseEncoder());
} else {
@@ -263,7 +271,7 @@ else[HADOOP_NON_SECURE]*/
new RequestDecoder(conf, byteCounter));
pipeline.addLast("requestProcessor",
requestServerHandlerFactory.newHandler(
- workerRequestReservedMap, conf));
+ workerRequestReservedMap, conf, myTaskInfo));
if (executionHandler != null) {
pipeline.addAfter(handlerBeforeExecutionHandler,
"executionHandler", executionHandler);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Tue Nov 13 20:03:07 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.ImmutableClasse
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.hadoop.io.Writable;
@@ -67,7 +68,8 @@ public class NettyWorkerClient<I extends
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
CentralizedServiceWorker<I, V, E, M> service) {
- this.nettyClient = new NettyClient(context, configuration);
+ this.nettyClient =
+ new NettyClient(context, configuration, service.getWorkerInfo());
this.conf = configuration;
this.service = service;
}
@@ -78,7 +80,7 @@ public class NettyWorkerClient<I extends
@Override
public void openConnections() {
- List<WorkerInfo> addresses = Lists.newArrayListWithCapacity(
+ List<TaskInfo> addresses = Lists.newArrayListWithCapacity(
service.getWorkerInfoList().size());
for (WorkerInfo info : service.getWorkerInfoList()) {
// No need to connect to myself
@@ -96,9 +98,9 @@ public class NettyWorkerClient<I extends
}
@Override
- public void sendWritableRequest(Integer destWorkerId,
+ public void sendWritableRequest(Integer destTaskId,
WritableRequest request) {
- nettyClient.sendWritableRequest(destWorkerId, request);
+ nettyClient.sendWritableRequest(destTaskId, request);
}
@Override
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Nov 13 20:03:07 2012
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Sets;
+import java.net.InetSocketAddress;
import java.util.Set;
/**
@@ -89,7 +90,8 @@ public class NettyWorkerServer<I extends
new ServerData<I, V, E, M>(conf, createMessageStoreFactory(), context);
nettyServer = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData));
+ new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData),
+ service.getWorkerInfo());
nettyServer.start();
}
@@ -138,8 +140,8 @@ public class NettyWorkerServer<I extends
}
@Override
- public int getPort() {
- return nettyServer.getMyAddress().getPort();
+ public InetSocketAddress getMyAddress() {
+ return nettyServer.getMyAddress();
}
@Override
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/ClientRequestId.java Tue Nov 13 20:03:07 2012
@@ -21,27 +21,27 @@ package org.apache.giraph.comm.netty.han
/**
* Simple immutable object to use for tracking requests uniquely. This
* object is guaranteed to be unique for a given client (based on the
- * destination worker and the request).
+ * destination task and the request).
*/
public class ClientRequestId {
- /** Destination worker id */
- private final int destinationWorkerId;
+ /** Destination task id */
+ private final int destinationTaskId;
/** Request id */
private final long requestId;
/**
* Constructor.
*
- * @param destinationWorkerId Destination worker id
+ * @param destinationTaskId Destination task id
* @param requestId Request id
*/
- public ClientRequestId(int destinationWorkerId, long requestId) {
- this.destinationWorkerId = destinationWorkerId;
+ public ClientRequestId(int destinationTaskId, long requestId) {
+ this.destinationTaskId = destinationTaskId;
this.requestId = requestId;
}
- public int getDestinationWorkerId() {
- return destinationWorkerId;
+ public int getDestinationTaskId() {
+ return destinationTaskId;
}
public long getRequestId() {
@@ -50,7 +50,7 @@ public class ClientRequestId {
@Override
public int hashCode() {
- return (29 * destinationWorkerId) + (int) (57 * requestId);
+ return (29 * destinationTaskId) + (int) (57 * requestId);
}
@Override
@@ -58,7 +58,7 @@ public class ClientRequestId {
if (other instanceof ClientRequestId) {
ClientRequestId otherObj = (ClientRequestId) other;
if (otherObj.getRequestId() == requestId &&
- otherObj.getDestinationWorkerId() == destinationWorkerId) {
+ otherObj.getDestinationTaskId() == destinationTaskId) {
return true;
}
}
@@ -68,6 +68,6 @@ public class ClientRequestId {
@Override
public String toString() {
- return "(destWorker=" + destinationWorkerId + ",reqId=" + requestId + ")";
+ return "(destTask=" + destinationTaskId + ",reqId=" + requestId + ")";
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java Tue Nov 13 20:03:07 2012
@@ -18,9 +18,10 @@
package org.apache.giraph.comm.netty.handler;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.requests.MasterRequest;
import org.apache.giraph.graph.MasterAggregatorHandler;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.graph.TaskInfo;
/** Handler for requests on master */
public class MasterRequestServerHandler extends
@@ -33,12 +34,15 @@ public class MasterRequestServerHandler
*
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration
+ * @param myTaskInfo Current task info
* @param aggregatorHandler Master aggregator handler
*/
public MasterRequestServerHandler(
- WorkerRequestReservedMap workerRequestReservedMap, Configuration conf,
+ WorkerRequestReservedMap workerRequestReservedMap,
+ ImmutableClassesGiraphConfiguration conf,
+ TaskInfo myTaskInfo,
MasterAggregatorHandler aggregatorHandler) {
- super(workerRequestReservedMap, conf);
+ super(workerRequestReservedMap, conf, myTaskInfo);
this.aggregatorHandler = aggregatorHandler;
}
@@ -66,9 +70,10 @@ public class MasterRequestServerHandler
@Override
public RequestServerHandler newHandler(
WorkerRequestReservedMap workerRequestReservedMap,
- Configuration conf) {
+ ImmutableClassesGiraphConfiguration conf,
+ TaskInfo myTaskInfo) {
return new MasterRequestServerHandler(workerRequestReservedMap, conf,
- aggregatorHandler);
+ myTaskInfo, aggregatorHandler);
}
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java Tue Nov 13 20:03:07 2012
@@ -19,11 +19,12 @@
package org.apache.giraph.comm.netty.handler;
import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.utils.SystemTime;
import org.apache.giraph.utils.Time;
import org.apache.giraph.utils.Times;
-import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -53,8 +54,8 @@ public abstract class RequestServerHandl
private final boolean closeFirstRequest;
/** Request reserved map (for exactly one semantics) */
private final WorkerRequestReservedMap workerRequestReservedMap;
- /** My worker id */
- private final int myWorkerId;
+ /** My task info */
+ private final TaskInfo myTaskInfo;
/** Start nanoseconds for the processing time */
private long startProcessingNanoseconds = -1;
@@ -63,15 +64,17 @@ public abstract class RequestServerHandl
*
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration
+ * @param myTaskInfo Current task info
*/
public RequestServerHandler(
WorkerRequestReservedMap workerRequestReservedMap,
- Configuration conf) {
+ ImmutableClassesGiraphConfiguration conf,
+ TaskInfo myTaskInfo) {
this.workerRequestReservedMap = workerRequestReservedMap;
closeFirstRequest = conf.getBoolean(
GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
- myWorkerId = conf.getInt("mapred.task.partition", -1);
+ this.myTaskInfo = myTaskInfo;
}
@Override
@@ -120,7 +123,7 @@ public abstract class RequestServerHandl
// Send the response with the request id
ChannelBuffer buffer = ChannelBuffers.directBuffer(RESPONSE_BYTES);
- buffer.writeInt(myWorkerId);
+ buffer.writeInt(myTaskInfo.getTaskId());
buffer.writeLong(writableRequest.getRequestId());
buffer.writeByte(alreadyDone);
e.getChannel().write(buffer);
@@ -167,10 +170,12 @@ public abstract class RequestServerHandl
*
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration to use
+ * @param myTaskInfo Current task info
* @return New {@link RequestServerHandler}
*/
RequestServerHandler newHandler(
WorkerRequestReservedMap workerRequestReservedMap,
- Configuration conf);
+ ImmutableClassesGiraphConfiguration conf,
+ TaskInfo myTaskInfo);
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java Tue Nov 13 20:03:07 2012
@@ -18,9 +18,10 @@
package org.apache.giraph.comm.netty.handler;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.requests.WorkerRequest;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.graph.TaskInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -44,11 +45,13 @@ public class WorkerRequestServerHandler<
* @param serverData Data held by the server
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration
+ * @param myTaskInfo Current task info
*/
public WorkerRequestServerHandler(ServerData<I, V, E, M> serverData,
WorkerRequestReservedMap workerRequestReservedMap,
- Configuration conf) {
- super(workerRequestReservedMap, conf);
+ ImmutableClassesGiraphConfiguration conf,
+ TaskInfo myTaskInfo) {
+ super(workerRequestReservedMap, conf, myTaskInfo);
this.serverData = serverData;
}
@@ -76,9 +79,10 @@ public class WorkerRequestServerHandler<
@Override
public RequestServerHandler newHandler(
WorkerRequestReservedMap workerRequestReservedMap,
- Configuration conf) {
+ ImmutableClassesGiraphConfiguration conf,
+ TaskInfo myTaskInfo) {
return new WorkerRequestServerHandler<I, V, E,
- M>(serverData, workerRequestReservedMap, conf);
+ M>(serverData, workerRequestReservedMap, conf, myTaskInfo);
}
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java Tue Nov 13 20:03:07 2012
@@ -34,7 +34,7 @@ import java.util.List;
*/
public class AddressesAndPartitionsWritable implements Writable {
/** Master information */
- private WorkerInfo masterInfo;
+ private MasterInfo masterInfo;
/** List of all workers */
private List<WorkerInfo> workerInfos;
/** Collection of partitions */
@@ -49,7 +49,7 @@ public class AddressesAndPartitionsWrita
* @param workerInfos List of all workers
* @param partitionOwners Collection of partitions
*/
- public AddressesAndPartitionsWritable(WorkerInfo masterInfo,
+ public AddressesAndPartitionsWritable(MasterInfo masterInfo,
List<WorkerInfo> workerInfos,
Collection<PartitionOwner> partitionOwners) {
this.masterInfo = masterInfo;
@@ -72,7 +72,7 @@ public class AddressesAndPartitionsWrita
*
* @return Master information
*/
- public WorkerInfo getMasterInfo() {
+ public MasterInfo getMasterInfo() {
return masterInfo;
}
@@ -111,7 +111,7 @@ public class AddressesAndPartitionsWrita
@Override
public void readFields(DataInput input) throws IOException {
- masterInfo = new WorkerInfo();
+ masterInfo = new MasterInfo();
masterInfo.readFields(input);
int workerInfosSize = input.readInt();
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java Tue Nov 13 20:03:07 2012
@@ -276,7 +276,7 @@ public abstract class BspService<I exten
this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
context.getConfiguration());
this.jobId = conf.get("mapred.job.id", "Unknown Job");
- this.taskPartition = conf.getInt("mapred.task.partition", -1);
+ this.taskPartition = conf.getTaskPartition();
this.restartedSuperstep = conf.getLong(
GiraphConfiguration.RESTART_SUPERSTEP,
UNSET_SUPERSTEP);
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Nov 13 20:03:07 2012
@@ -24,8 +24,10 @@ import org.apache.giraph.bsp.Application
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClientServer;
-import org.apache.giraph.comm.netty.NettyMasterClientServer;
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.MasterServer;
+import org.apache.giraph.comm.netty.NettyMasterClient;
+import org.apache.giraph.comm.netty.NettyMasterServer;
import org.apache.giraph.counters.GiraphStats;
import org.apache.giraph.graph.GraphMapper.MapFunctions;
import org.apache.giraph.graph.partition.MasterGraphPartitioner;
@@ -140,10 +142,12 @@ public class BspServiceMaster<I extends
private MasterAggregatorHandler aggregatorHandler;
/** Master class */
private MasterCompute masterCompute;
- /** Communication service */
- private MasterClientServer commService;
+ /** IPC Client */
+ private MasterClient masterClient;
+ /** IPC Server */
+ private MasterServer masterServer;
/** Master info */
- private WorkerInfo masterInfo;
+ private MasterInfo masterInfo;
/** List of workers in current superstep */
private List<WorkerInfo> chosenWorkerInfoList = Lists.newArrayList();
/** Limit locality information added to each InputSplit znode */
@@ -786,10 +790,11 @@ public class BspServiceMaster<I extends
getContext());
aggregatorHandler.initialize(this);
- commService = new NettyMasterClientServer(
- getContext(), getConfiguration(), this);
- masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
- commService.getMyAddress().getPort());
+ masterInfo = new MasterInfo();
+ masterServer = new NettyMasterServer(getConfiguration(), this);
+ masterInfo.setInetSocketAddress(masterServer.getMyAddress());
+ masterClient =
+ new NettyMasterClient(getContext(), getConfiguration(), this);
if (LOG.isInfoEnabled()) {
LOG.info("becomeMaster: I am now the master!");
@@ -810,6 +815,11 @@ public class BspServiceMaster<I extends
}
}
+ @Override
+ public MasterInfo getMasterInfo() {
+ return masterInfo;
+ }
+
/**
* Collect and aggregate the worker statistics for a particular superstep.
*
@@ -1350,7 +1360,7 @@ public class BspServiceMaster<I extends
}
}
- commService.openConnections();
+ masterClient.openConnections();
GiraphStats.getInstance().
getCurrentWorkers().setValue(chosenWorkerInfoList.size());
@@ -1361,7 +1371,7 @@ public class BspServiceMaster<I extends
// We need to finalize aggregators from previous superstep (send them to
// worker owners) after new worker assignments
if (getSuperstep() >= 0) {
- aggregatorHandler.finishSuperstep(commService);
+ aggregatorHandler.finishSuperstep(masterClient);
}
// Finalize the valid checkpoint file prefixes and possibly
@@ -1406,7 +1416,7 @@ public class BspServiceMaster<I extends
// Collect aggregator values, then run the master.compute() and
// finally save the aggregator values
- aggregatorHandler.prepareSuperstep(commService);
+ aggregatorHandler.prepareSuperstep(masterClient);
runMasterCompute(getSuperstep());
// If the master is halted or all the vertices voted to halt and there
@@ -1614,8 +1624,8 @@ public class BspServiceMaster<I extends
}
aggregatorHandler.close();
- commService.closeConnections();
- commService.close();
+ masterClient.closeConnections();
+ masterServer.close();
}
try {
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Nov 13 20:03:07 2012
@@ -125,7 +125,7 @@ public class BspServiceWorker<I extends
private final WorkerAggregatorRequestProcessor
workerAggregatorRequestProcessor;
/** Master info */
- private WorkerInfo masterInfo = new WorkerInfo();
+ private MasterInfo masterInfo = new MasterInfo();
/** List of workers */
private List<WorkerInfo> workerInfoList = Lists.newArrayList();
/** Have the partition exchange children (workers) changed? */
@@ -170,17 +170,17 @@ public class BspServiceWorker<I extends
registerBspEvent(partitionExchangeChildrenChanged);
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
- workerServer = new NettyWorkerServer<I, V, E, M>(getConfiguration(),
- this, context);
- workerClient = new NettyWorkerClient<I, V, E, M>(context,
- getConfiguration(), this);
+ workerInfo = new WorkerInfo(getTaskPartition());
+ workerServer =
+ new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context);
+ workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+ workerClient =
+ new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this);
workerAggregatorRequestProcessor =
new NettyWorkerAggregatorRequestProcessor(getContext(),
getConfiguration(), this);
- workerInfo = new WorkerInfo(
- getHostname(), getTaskPartition(), workerServer.getPort());
this.workerContext = getConfiguration().createWorkerContext(null);
aggregatorHandler =
@@ -334,7 +334,7 @@ public class BspServiceWorker<I extends
}
@Override
- public WorkerInfo getMasterInfo() {
+ public MasterInfo getMasterInfo() {
return masterInfo;
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java Tue Nov 13 20:03:07 2012
@@ -20,7 +20,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClientServer;
+import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
@@ -145,9 +145,9 @@ public class MasterAggregatorHandler imp
/**
* Prepare aggregators for current superstep
*
- * @param commService Communication service
+ * @param masterClient IPC client on master
*/
- public void prepareSuperstep(MasterClientServer commService) {
+ public void prepareSuperstep(MasterClient masterClient) {
if (LOG.isDebugEnabled()) {
LOG.debug("prepareSuperstep: Start preapring aggregators");
}
@@ -169,9 +169,9 @@ public class MasterAggregatorHandler imp
/**
* Finalize aggregators for current superstep and share them with workers
*
- * @param commService Communication service
+ * @param masterClient IPC client on master
*/
- public void finishSuperstep(MasterClientServer commService) {
+ public void finishSuperstep(MasterClient masterClient) {
if (LOG.isDebugEnabled()) {
LOG.debug("finishSuperstep: Start finishing aggregators");
}
@@ -192,12 +192,12 @@ public class MasterAggregatorHandler imp
try {
for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
aggregatorMap.entrySet()) {
- commService.sendAggregator(entry.getKey(),
+ masterClient.sendAggregator(entry.getKey(),
entry.getValue().getAggregatorClass(),
entry.getValue().getPreviousAggregatedValue());
progressable.progress();
}
- commService.finishSendingAggregatedValues();
+ masterClient.finishSendingAggregatedValues();
} catch (IOException e) {
throw new IllegalStateException("finishSuperstep: " +
"IOException occurred while sending aggregators", e);
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java?rev=1408926&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java Tue Nov 13 20:03:07 2012
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * Information about the master that is sent to other workers.
+ */
+public class MasterInfo extends TaskInfo {
+ /**
+ * Constructor
+ */
+ public MasterInfo() {
+ }
+
+ @Override
+ public int getTaskId() {
+ return -1;
+ }
+
+ @Override
+ public String toString() {
+ return "Master(hostname=" + getHostname() + ", port=" + getPort() + ")";
+ }
+}
Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java?rev=1408926&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java Tue Nov 13 20:03:07 2012
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Abstract class for information about any task - worker or master.
+ */
+public abstract class TaskInfo implements Writable {
+ /** Task hostname */
+ private String hostname;
+ /** Port that the IPC server is using */
+ private int port;
+
+ /**
+ * Constructor
+ */
+ public TaskInfo() {
+ }
+
+ /**
+ * Get this task's hostname
+ *
+ * @return Hostname
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * Get port that the IPC server of this task is using
+ *
+ * @return Port
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Set address that the IPC server of this task is using
+ *
+ * @param address Address
+ */
+ public void setInetSocketAddress(InetSocketAddress address) {
+ this.port = address.getPort();
+ this.hostname = address.getHostName();
+ }
+
+ /**
+ * Get a new instance of the InetSocketAddress for this hostname and port
+ *
+ * @return InetSocketAddress of the hostname and port.
+ */
+ public InetSocketAddress getInetSocketAddress() {
+ return new InetSocketAddress(hostname, port);
+ }
+
+ /**
+ * Get task partition id of this task
+ *
+ * @return Task partition id of this task
+ */
+ public abstract int getTaskId();
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof TaskInfo) {
+ TaskInfo taskInfo = (TaskInfo) other;
+ if (hostname.equals(taskInfo.getHostname()) &&
+ (getTaskId() == taskInfo.getTaskId()) &&
+ (port == taskInfo.getPort())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ hostname = input.readUTF();
+ port = input.readInt();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeUTF(hostname);
+ output.writeInt(port);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 37 * result + getPort();
+ result = 37 * result + hostname.hashCode();
+ result = 37 * result + getTaskId();
+ return result;
+ }
+}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerInfo.java Tue Nov 13 20:03:07 2012
@@ -23,18 +23,12 @@ import java.io.DataOutput;
import java.io.IOException;
import java.net.InetSocketAddress;
-import org.apache.hadoop.io.Writable;
-
/**
* Information about a worker that is sent to the master and other workers.
*/
-public class WorkerInfo implements Writable {
- /** Worker hostname */
- private String hostname;
- /** Task Partition (Worker) ID of this worker */
- private int taskId = -1;
- /** Port that the IPC server is using */
- private int port = -1;
+public class WorkerInfo extends TaskInfo {
+ /** Task Partition (Worker) ID of this task */
+ private int taskId;
/** Hostname + "_" + id for easier debugging */
private String hostnameId;
@@ -47,92 +41,43 @@ public class WorkerInfo implements Writa
/**
* Constructor with parameters.
*
- * @param hostname Hostname of this worker.
* @param taskId the task partition for this worker
- * @param port Port of the service.
*/
- public WorkerInfo(String hostname, int taskId, int port) {
- this.hostname = hostname;
+ public WorkerInfo(int taskId) {
this.taskId = taskId;
- this.port = port;
- this.hostnameId = hostname + "_" + taskId;
- }
-
- /**
- * Constructor with InetSocketAddress
- *
- * @param address Address of this worker
- * @param taskId The task partition for this worker
- */
- public WorkerInfo(InetSocketAddress address, int taskId) {
- this(address.getHostName(), taskId, address.getPort());
- }
-
- public String getHostname() {
- return hostname;
}
+ @Override
public int getTaskId() {
return taskId;
}
- public String getHostnameId() {
- return hostnameId;
- }
-
- /**
- * Get a new instance of the InetSocketAddress for this hostname and port
- *
- * @return InetSocketAddress of the hostname and port.
- */
- public InetSocketAddress getInetSocketAddress() {
- return new InetSocketAddress(hostname, port);
- }
-
- public int getPort() {
- return port;
- }
-
@Override
- public boolean equals(Object other) {
- if (other instanceof WorkerInfo) {
- WorkerInfo workerInfo = (WorkerInfo) other;
- if (hostname.equals(workerInfo.getHostname()) &&
- (taskId == workerInfo.getTaskId()) &&
- (port == workerInfo.getPort())) {
- return true;
- }
- }
- return false;
+ public void setInetSocketAddress(InetSocketAddress address) {
+ super.setInetSocketAddress(address);
+ hostnameId = getHostname() + "_" + getTaskId();
}
- @Override
- public int hashCode() {
- int result = 17;
- result = 37 * result + port;
- result = 37 * result + hostname.hashCode();
- result = 37 * result + taskId;
- return result;
+ public String getHostnameId() {
+ return hostnameId;
}
@Override
public String toString() {
- return "Worker(hostname=" + hostname + ", MRtaskID=" +
- taskId + ", port=" + port + ")";
+ return "Worker(hostname=" + getHostname() + ", MRtaskID=" +
+ getTaskId() + ", port=" + getPort() + ")";
}
@Override
public void readFields(DataInput input) throws IOException {
- hostname = input.readUTF();
+ super.readFields(input);
taskId = input.readInt();
- port = input.readInt();
- hostnameId = hostname + "_" + taskId;
+ hostnameId = getHostname() + "_" + getTaskId();
}
@Override
public void write(DataOutput output) throws IOException {
- output.writeUTF(hostname);
+ super.write(output);
output.writeInt(taskId);
- output.writeInt(port);
}
}
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java Tue Nov 13 20:03:07 2012
@@ -138,7 +138,7 @@ public class ZooKeeperManager {
throws IOException {
this.context = context;
this.conf = configuration;
- taskPartition = conf.getInt("mapred.task.partition", -1);
+ taskPartition = conf.getTaskPartition();
jobId = conf.get("mapred.job.id", "Unknown Job");
baseDirectory =
new Path(conf.get(GiraphConfiguration.ZOOKEEPER_MANAGER_DIRECTORY,
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue Nov 13 20:03:07 2012
@@ -74,15 +74,16 @@ public class ConnectionTest {
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
MockUtils.createNewServerData(conf, context);
+ WorkerInfo workerInfo = new WorkerInfo(-1);
NettyServer server =
new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData));
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo);
server.start();
+ workerInfo.setInetSocketAddress(server.getMyAddress());
- NettyClient client = new NettyClient(context, conf);
+ NettyClient client = new NettyClient(context, conf, new WorkerInfo());
client.connectAllAddresses(
- Lists.<WorkerInfo>newArrayList(
- new WorkerInfo(server.getMyAddress(), -1)));
+ Lists.<WorkerInfo>newArrayList(workerInfo));
client.stop();
server.stop();
@@ -104,17 +105,25 @@ public class ConnectionTest {
RequestServerHandler.Factory requestServerHandlerFactory =
new WorkerRequestServerHandler.Factory(serverData);
- NettyServer server1 = new NettyServer(conf, requestServerHandlerFactory);
+ WorkerInfo workerInfo1 = new WorkerInfo(1);
+ NettyServer server1 =
+ new NettyServer(conf, requestServerHandlerFactory, workerInfo1);
server1.start();
- NettyServer server2 = new NettyServer(conf, requestServerHandlerFactory);
+ workerInfo1.setInetSocketAddress(server1.getMyAddress());
+
+ WorkerInfo workerInfo2 = new WorkerInfo(2);
+ NettyServer server2 =
+ new NettyServer(conf, requestServerHandlerFactory, workerInfo2);
server2.start();
- NettyServer server3 = new NettyServer(conf, requestServerHandlerFactory);
+ workerInfo2.setInetSocketAddress(server2.getMyAddress());
+
+ WorkerInfo workerInfo3 = new WorkerInfo(3);
+ NettyServer server3 =
+ new NettyServer(conf, requestServerHandlerFactory, workerInfo3);
server3.start();
+ workerInfo3.setInetSocketAddress(server3.getMyAddress());
- NettyClient client = new NettyClient(context, conf);
- WorkerInfo workerInfo1 = new WorkerInfo(server1.getMyAddress(), 1);
- WorkerInfo workerInfo2 = new WorkerInfo(server2.getMyAddress(), 2);
- WorkerInfo workerInfo3 = new WorkerInfo(server3.getMyAddress(), 3);
+ NettyClient client = new NettyClient(context, conf, new WorkerInfo());
List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
workerInfo2, workerInfo3);
client.connectAllAddresses(addresses);
@@ -138,18 +147,18 @@ public class ConnectionTest {
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
MockUtils.createNewServerData(conf, context);
+ WorkerInfo workerInfo = new WorkerInfo(-1);
NettyServer server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData));
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo);
server.start();
+ workerInfo.setInetSocketAddress(server.getMyAddress());
- List<WorkerInfo> addresses =
- Lists.<WorkerInfo>newArrayList(
- new WorkerInfo(server.getMyAddress(), -1));
- NettyClient client1 = new NettyClient(context, conf);
+ List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
+ NettyClient client1 = new NettyClient(context, conf, new WorkerInfo());
client1.connectAllAddresses(addresses);
- NettyClient client2 = new NettyClient(context, conf);
+ NettyClient client2 = new NettyClient(context, conf, new WorkerInfo());
client2.connectAllAddresses(addresses);
- NettyClient client3 = new NettyClient(context, conf);
+ NettyClient client3 = new NettyClient(context, conf, new WorkerInfo());
client3.connectAllAddresses(addresses);
client1.stop();
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Tue Nov 13 20:03:07 2012
@@ -128,29 +128,7 @@ public class RequestFailureTest {
@Test
public void send2Requests() throws IOException {
- // Start the service
- serverData = MockUtils.createNewServerData(conf, context);
- server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData));
- server.start();
- client = new NettyClient(context, conf);
- client.connectAllAddresses(
- Lists.<WorkerInfo>newArrayList(
- new WorkerInfo(server.getMyAddress(), -1)));
-
- // Send the request 2x
- WritableRequest request1 = getRequest();
- WritableRequest request2 = getRequest();
- client.sendWritableRequest(-1, request1);
- client.sendWritableRequest(-1, request2);
- client.waitAllRequests();
-
- // Stop the service
- client.stop();
- server.stop();
-
- // Check the output (should have been only processed once)
- checkResult(2);
+ checkSendingTwoRequests();
}
@Test
@@ -162,29 +140,7 @@ public class RequestFailureTest {
// Loop every 2 seconds
conf.setInt(GiraphConfiguration.WAITING_REQUEST_MSECS, 2000);
- // Start the service
- serverData = MockUtils.createNewServerData(conf, context);
- server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData));
- server.start();
- client = new NettyClient(context, conf);
- client.connectAllAddresses(
- Lists.<WorkerInfo>newArrayList(
- new WorkerInfo(server.getMyAddress(), -1)));
-
- // Send the request 2x, but should only be processed once
- WritableRequest request1 = getRequest();
- WritableRequest request2 = getRequest();
- client.sendWritableRequest(-1, request1);
- client.sendWritableRequest(-1, request2);
- client.waitAllRequests();
-
- // Stop the service
- client.stop();
- server.stop();
-
- // Check the output (should have been only processed once)
- checkResult(2);
+ checkSendingTwoRequests();
}
@Test
@@ -196,21 +152,26 @@ public class RequestFailureTest {
// Loop every 2 seconds
conf.setInt(GiraphConfiguration.WAITING_REQUEST_MSECS, 2000);
+ checkSendingTwoRequests();
+ }
+
+ private void checkSendingTwoRequests() throws IOException {
// Start the service
serverData = MockUtils.createNewServerData(conf, context);
+ WorkerInfo workerInfo = new WorkerInfo(-1);
server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData));
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo);
server.start();
- client = new NettyClient(context, conf);
+ workerInfo.setInetSocketAddress(server.getMyAddress());
+ client = new NettyClient(context, conf, new WorkerInfo());
client.connectAllAddresses(
- Lists.<WorkerInfo>newArrayList(
- new WorkerInfo(server.getMyAddress(), -1)));
+ Lists.<WorkerInfo>newArrayList(workerInfo));
// Send the request 2x, but should only be processed once
WritableRequest request1 = getRequest();
WritableRequest request2 = getRequest();
- client.sendWritableRequest(-1, request1);
- client.sendWritableRequest(-1, request2);
+ client.sendWritableRequest(workerInfo.getTaskId(), request1);
+ client.sendWritableRequest(workerInfo.getTaskId(), request2);
client.waitAllRequests();
// Stop the service
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Nov 13 20:03:07 2012
@@ -67,6 +67,8 @@ public class RequestTest {
private NettyServer server;
/** Client */
private NettyClient client;
+ /** Worker info */
+ private WorkerInfo workerInfo;
/**
* Only for testing.
@@ -92,13 +94,14 @@ public class RequestTest {
// Start the service
serverData = MockUtils.createNewServerData(conf, context);
+ workerInfo = new WorkerInfo(-1);
server = new NettyServer(conf,
- new WorkerRequestServerHandler.Factory(serverData));
+ new WorkerRequestServerHandler.Factory(serverData), workerInfo);
server.start();
- client = new NettyClient(context, conf);
+ workerInfo.setInetSocketAddress(server.getMyAddress());
+ client = new NettyClient(context, conf, new WorkerInfo());
client.connectAllAddresses(
- Lists.<WorkerInfo>newArrayList(
- new WorkerInfo(server.getMyAddress(), -1)));
+ Lists.<WorkerInfo>newArrayList(workerInfo));
}
@Test
@@ -120,7 +123,7 @@ public class RequestTest {
IntWritable> request =
new SendVertexRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(partitionId, vertices);
- client.sendWritableRequest(-1, request);
+ client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
// Stop the service
@@ -165,7 +168,7 @@ public class RequestTest {
IntWritable> request =
new SendWorkerMessagesRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(dataToSend);
- client.sendWritableRequest(-1, request);
+ client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
// Stop the service
@@ -228,7 +231,7 @@ public class RequestTest {
IntWritable> request =
new SendPartitionMutationsRequest<IntWritable, IntWritable,
IntWritable, IntWritable>(partitionId, vertexIdMutations);
- client.sendWritableRequest(-1, request);
+ client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
// Stop the service
Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1408926&r1=1408925&r2=1408926&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Tue Nov 13 20:03:07 2012
@@ -83,16 +83,17 @@ public class SaslConnectionTest {
when(mockedSaslServerFactory.newHandler(conf)).
thenReturn(mockedSaslServerHandler);
+ WorkerInfo workerInfo = new WorkerInfo(-1);
NettyServer server =
new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData),
+ workerInfo,
mockedSaslServerFactory);
server.start();
+ workerInfo.setInetSocketAddress(server.getMyAddress());
- NettyClient client = new NettyClient(context, conf);
- client.connectAllAddresses(
- Lists.<WorkerInfo>newArrayList(
- new WorkerInfo(server.getMyAddress(), -1)));
+ NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+ client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo));
client.stop();
server.stop();