You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:40:21 UTC
svn commit: r1390014 [2/4] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/comm/messages/ src/...
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Tue Sep 25 17:40:18 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;
import org.apache.giraph.comm.netty.handler.ClientRequestId;
import org.apache.giraph.comm.netty.handler.RequestServerHandler;
@@ -38,9 +40,7 @@ import org.apache.giraph.comm.netty.hand
import org.apache.giraph.comm.netty.handler.RequestEncoder;
import org.apache.giraph.comm.netty.handler.RequestInfo;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.utils.TimedLogger;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
@@ -125,17 +125,20 @@ public class NettyClient {
* Only constructor
*
* @param context Context for progress
+ * @param conf Configuration
*/
- public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
+ public NettyClient(Mapper<?, ?, ?, ?>.Context context,
+ final ImmutableClassesGiraphConfiguration conf) {
this.context = context;
- final Configuration conf = context.getConfiguration();
this.channelsPerServer = conf.getInt(
- GiraphJob.CHANNELS_PER_SERVER,
- GiraphJob.DEFAULT_CHANNELS_PER_SERVER);
- sendBufferSize = conf.getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
- GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
- receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
- GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+ GiraphConfiguration.CHANNELS_PER_SERVER,
+ GiraphConfiguration.DEFAULT_CHANNELS_PER_SERVER);
+ sendBufferSize = conf.getInt(
+ GiraphConfiguration.CLIENT_SEND_BUFFER_SIZE,
+ GiraphConfiguration.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
+ receiveBufferSize = conf.getInt(
+ GiraphConfiguration.CLIENT_RECEIVE_BUFFER_SIZE,
+ GiraphConfiguration.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
limitNumberOfOpenRequests = conf.getBoolean(
LIMIT_NUMBER_OF_OPEN_REQUESTS,
@@ -153,22 +156,22 @@ public class NettyClient {
}
maxRequestMilliseconds = conf.getInt(
- GiraphJob.MAX_REQUEST_MILLISECONDS,
- GiraphJob.MAX_REQUEST_MILLISECONDS_DEFAULT);
+ GiraphConfiguration.MAX_REQUEST_MILLISECONDS,
+ GiraphConfiguration.MAX_REQUEST_MILLISECONDS_DEFAULT);
maxConnectionFailures = conf.getInt(
- GiraphJob.NETTY_MAX_CONNECTION_FAILURES,
- GiraphJob.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
+ GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES,
+ GiraphConfiguration.NETTY_MAX_CONNECTION_FAILURES_DEFAULT);
maxReconnectionFailures = conf.getInt(
- GiraphJob.MAX_RECONNECT_ATTEMPTS,
- GiraphJob.MAX_RECONNECT_ATTEMPTS_DEFAULT);
+ GiraphConfiguration.MAX_RECONNECT_ATTEMPTS,
+ GiraphConfiguration.MAX_RECONNECT_ATTEMPTS_DEFAULT);
waitingRequestMsecs = conf.getInt(
- GiraphJob.WAITING_REQUEST_MSECS,
- GiraphJob.WAITING_REQUEST_MSECS_DEFAULT);
+ GiraphConfiguration.WAITING_REQUEST_MSECS,
+ GiraphConfiguration.WAITING_REQUEST_MSECS_DEFAULT);
- int maxThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ int maxThreads = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
clientRequestIdRequestInfoMap =
new MapMaker().concurrencyLevel(maxThreads).makeMap();
@@ -436,6 +439,7 @@ public class NettyClient {
"have a previous request id = " + request.getRequestId() + ", " +
"request info of " + oldRequestInfo);
}
+
ChannelFuture writeFuture = channel.write(request);
newRequestInfo.setWriteFuture(writeFuture);
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.hadoop.mapreduce.Mapper;
@@ -43,9 +44,11 @@ public class NettyMasterClient implement
* Constructor
*
* @param context Context from mapper
+ * @param configuration Configuration
*/
- public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context) {
- this.nettyClient = new NettyClient(context);
+ public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration configuration) {
+ this.nettyClient = new NettyClient(context, configuration);
workers = Lists.newArrayList();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.MasterClientServer;
import org.apache.giraph.comm.MasterServer;
@@ -39,10 +40,13 @@ public class NettyMasterClientServer imp
* Constructor
*
* @param context Mapper context
+ * @param configuration Configuration
*/
- public NettyMasterClientServer(Mapper<?, ?, ?, ?>.Context context) {
- client = new NettyMasterClient(context);
- server = new NettyMasterServer(context.getConfiguration());
+ public NettyMasterClientServer(
+ Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration configuration) {
+ client = new NettyMasterClient(context, configuration);
+ server = new NettyMasterServer(configuration);
}
@Override
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,9 @@
package org.apache.giraph.comm.netty;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
import org.apache.giraph.comm.MasterServer;
-import org.apache.hadoop.conf.Configuration;
import java.net.InetSocketAddress;
@@ -36,7 +36,7 @@ public class NettyMasterServer implement
*
* @param conf Hadoop configuration
*/
- public NettyMasterServer(Configuration conf) {
+ public NettyMasterServer(ImmutableClassesGiraphConfiguration conf) {
nettyServer = new NettyServer(conf,
new MasterRequestServerHandler.Factory());
nettyServer.start();
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Tue Sep 25 17:40:18 2012
@@ -24,11 +24,11 @@ import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.handler.WorkerRequestReservedMap;
import org.apache.giraph.comm.netty.handler.RequestDecoder;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.comm.netty.handler.RequestServerHandler;
-import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@@ -53,7 +53,7 @@ public class NettyServer {
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyServer.class);
/** Configuration */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration conf;
/** Factory of channels */
private ChannelFactory channelFactory;
/** Accepted channels */
@@ -89,15 +89,17 @@ public class NettyServer {
* @param conf Configuration to use
* @param requestServerHandlerFactory Factory for request handlers
*/
- public NettyServer(Configuration conf,
+ public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory) {
this.conf = conf;
this.requestServerHandlerFactory = requestServerHandlerFactory;
- sendBufferSize = conf.getInt(GiraphJob.SERVER_SEND_BUFFER_SIZE,
- GiraphJob.DEFAULT_SERVER_SEND_BUFFER_SIZE);
- receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE,
- GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
+ sendBufferSize = conf.getInt(
+ GiraphConfiguration.SERVER_SEND_BUFFER_SIZE,
+ GiraphConfiguration.DEFAULT_SERVER_SEND_BUFFER_SIZE);
+ receiveBufferSize = conf.getInt(
+ GiraphConfiguration.SERVER_RECEIVE_BUFFER_SIZE,
+ GiraphConfiguration.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
workerRequestReservedMap = new WorkerRequestReservedMap(conf);
@@ -113,11 +115,12 @@ public class NettyServer {
} catch (UnknownHostException e) {
throw new IllegalStateException("NettyServer: unable to get hostname");
}
- maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ maximumPoolSize = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
- tcpBacklog = conf.getInt(GiraphJob.TCP_BACKLOG,
- conf.getInt(GiraphJob.MAX_WORKERS, GiraphJob.TCP_BACKLOG_DEFAULT));
+ tcpBacklog = conf.getInt(GiraphConfiguration.TCP_BACKLOG,
+ conf.getInt(GiraphConfiguration.MAX_WORKERS,
+ GiraphConfiguration.TCP_BACKLOG_DEFAULT));
channelFactory = new NioServerSocketChannelFactory(
bossExecutorService,
@@ -151,19 +154,19 @@ public class NettyServer {
int taskId = conf.getInt("mapred.task.partition", -1);
int numTasks = conf.getInt("mapred.map.tasks", 1);
// number of workers + 1 for master
- int numServers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks) + 1;
+ int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) + 1;
int portIncrementConstant =
(int) Math.pow(10, Math.ceil(Math.log10(numServers)));
- int bindPort = conf.getInt(GiraphJob.RPC_INITIAL_PORT,
- GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+ int bindPort = conf.getInt(GiraphConfiguration.RPC_INITIAL_PORT,
+ GiraphConfiguration.RPC_INITIAL_PORT_DEFAULT) +
taskId;
int bindAttempts = 0;
final int maxRpcPortBindAttempts =
- conf.getInt(GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS,
- GiraphJob.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ conf.getInt(GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS,
+ GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
final boolean failFirstPortBindingAttempt =
- conf.getBoolean(GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
- GiraphJob.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
+ conf.getBoolean(GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
+ GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
// Simple handling of port collisions on the same machine while
// preserving debugability from the port number alone.
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Tue Sep 25 17:40:18 2012
@@ -20,6 +20,8 @@ package org.apache.giraph.comm.netty;
import com.google.common.collect.Sets;
import java.util.Set;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.SendMessageCache;
import org.apache.giraph.comm.SendMutationsCache;
@@ -33,13 +35,11 @@ import org.apache.giraph.comm.requests.S
import org.apache.giraph.comm.requests.WorkerRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -71,7 +71,7 @@ public class NettyWorkerClient<I extends
private static final Logger LOG =
Logger.getLogger(NettyWorkerClient.class);
/** Hadoop configuration */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Netty client that does that actual I/O */
private final NettyClient nettyClient;
/** Centralized service, needed to get vertex ranges */
@@ -104,22 +104,27 @@ public class NettyWorkerClient<I extends
* Only constructor.
*
* @param context Context from mapper
+ * @param configuration Configuration
* @param service Used to get partition mapping
* @param serverData Server data (used for local requests)
*/
- public NettyWorkerClient(Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E, M> service,
- ServerData<I, V, E, M> serverData) {
- this.nettyClient = new NettyClient(context);
- this.conf = context.getConfiguration();
+ public NettyWorkerClient(
+ Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+ CentralizedServiceWorker<I, V, E, M> service,
+ ServerData<I, V, E, M> serverData) {
+ this.nettyClient = new NettyClient(context, configuration);
+ this.conf = configuration;
this.service = service;
- maxMessagesPerPartition = conf.getInt(GiraphJob.MSG_SIZE,
- GiraphJob.MSG_SIZE_DEFAULT);
- maxMutationsPerPartition = conf.getInt(GiraphJob.MAX_MUTATIONS_PER_REQUEST,
- GiraphJob.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
+ maxMessagesPerPartition = conf.getInt(
+ GiraphConfiguration.MSG_SIZE,
+ GiraphConfiguration.MSG_SIZE_DEFAULT);
+ maxMutationsPerPartition = conf.getInt(
+ GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST,
+ GiraphConfiguration.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
maxResolveAddressAttempts = conf.getInt(
- GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS,
- GiraphJob.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
+ GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS,
+ GiraphConfiguration.MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT);
sendMessageCache = new SendMessageCache<I, M>(conf);
sendMutationsCache = new SendMutationsCache<I, V, E, M>();
this.serverData = serverData;
@@ -157,8 +162,8 @@ public class NettyWorkerClient<I extends
partitionOwner.getPartitionId()));
}
}
- boolean useNetty = conf.getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
+ boolean useNetty = conf.getBoolean(GiraphConfiguration.USE_NETTY,
+ GiraphConfiguration.USE_NETTY_DEFAULT);
if (useNetty) {
addresses.add(service.getMasterInfo().getInetSocketAddress());
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientServer.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
@@ -54,13 +55,18 @@ public class NettyWorkerClientServer<I e
* Constructor.
*
* @param context Mapper context
+ * @param configuration Configuration
* @param service Service for partition lookup
*/
- public NettyWorkerClientServer(Mapper<?, ?, ?, ?>.Context context,
+ public NettyWorkerClientServer(
+ Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
CentralizedServiceWorker<I, V, E, M> service) {
- server = new NettyWorkerServer<I, V, E, M>(context.getConfiguration(),
+ server = new NettyWorkerServer<I, V, E, M>(
+ configuration,
service);
- client = new NettyWorkerClient<I, V, E, M>(context, service,
+ client = new NettyWorkerClient<I, V, E, M>(context,
+ configuration, service,
((NettyWorkerServer<I, V, E, M>) server).getServerData());
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,8 @@
package org.apache.giraph.comm.netty;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
@@ -30,13 +32,10 @@ import org.apache.giraph.comm.messages.M
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.comm.messages.SequentialFileMessageStore;
import org.apache.giraph.comm.messages.SimpleMessageStore;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.partition.Partition;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -62,7 +61,7 @@ public class NettyWorkerServer<I extends
private static final Logger LOG =
Logger.getLogger(NettyWorkerServer.class);
/** Hadoop configuration */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Service worker */
private final CentralizedServiceWorker<I, V, E, M> service;
/** Netty server that does that actual I/O */
@@ -76,20 +75,21 @@ public class NettyWorkerServer<I extends
* @param conf Configuration
* @param service Service to get partition mappings
*/
- public NettyWorkerServer(Configuration conf,
+ public NettyWorkerServer(ImmutableClassesGiraphConfiguration conf,
CentralizedServiceWorker<I, V, E, M> service) {
this.conf = conf;
this.service = service;
boolean useOutOfCoreMessaging = conf.getBoolean(
- GiraphJob.USE_OUT_OF_CORE_MESSAGES,
- GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
+ GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
+ GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
if (!useOutOfCoreMessaging) {
serverData = new ServerData<I, V, E, M>(
conf, SimpleMessageStore.newFactory(service, conf));
} else {
- int maxMessagesInMemory = conf.getInt(GiraphJob.MAX_MESSAGES_IN_MEMORY,
- GiraphJob.MAX_MESSAGES_IN_MEMORY_DEFAULT);
+ int maxMessagesInMemory = conf.getInt(
+ GiraphConfiguration.MAX_MESSAGES_IN_MEMORY,
+ GiraphConfiguration.MAX_MESSAGES_IN_MEMORY_DEFAULT);
MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory =
SequentialFileMessageStore.newFactory(conf);
MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
@@ -143,8 +143,8 @@ public class NettyWorkerServer<I extends
// Resolve all graph mutations
for (I vertexIndex : resolveVertexIndexSet) {
VertexResolver<I, V, E, M> vertexResolver =
- BspUtils.createVertexResolver(
- conf, service.getGraphMapper().getGraphState());
+ conf.createVertexResolver(
+ service.getGraphMapper().getGraphState());
Vertex<I, V, E, M> originalVertex =
service.getVertex(vertexIndex);
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestDecoder.java Tue Sep 25 17:40:18 2012
@@ -18,11 +18,12 @@
package org.apache.giraph.comm.netty.handler;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.comm.netty.ByteCounter;
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.giraph.utils.ReflectionUtils;
+
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
@@ -38,7 +39,7 @@ public class RequestDecoder extends OneT
private static final Logger LOG =
Logger.getLogger(RequestDecoder.class);
/** Configuration */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration conf;
/** Byte counter to output */
private final ByteCounter byteCounter;
@@ -48,7 +49,8 @@ public class RequestDecoder extends OneT
* @param conf Configuration
* @param byteCounter Keeps track of the decoded bytes
*/
- public RequestDecoder(Configuration conf, ByteCounter byteCounter) {
+ public RequestDecoder(ImmutableClassesGiraphConfiguration conf,
+ ByteCounter byteCounter) {
this.conf = conf;
this.byteCounter = byteCounter;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
package org.apache.giraph.comm.netty.handler;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -62,8 +62,8 @@ public abstract class RequestServerHandl
Configuration conf) {
this.workerRequestReservedMap = workerRequestReservedMap;
closeFirstRequest = conf.getBoolean(
- GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
- GiraphJob.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
+ GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED,
+ GiraphConfiguration.NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT);
myWorkerId = conf.getInt("mapred.task.partition", -1);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java Tue Sep 25 17:40:18 2012
@@ -18,7 +18,7 @@
package org.apache.giraph.comm.netty.handler;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -60,8 +60,8 @@ public class ResponseClientHandler exten
Configuration conf) {
this.workerIdOutstandingRequestMap = workerIdOutstandingRequestMap;
dropFirstResponse = conf.getBoolean(
- GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
- GiraphJob.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
+ GiraphConfiguration.NETTY_SIMULATE_FIRST_RESPONSE_FAILED,
+ GiraphConfiguration.NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT);
}
@Override
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestReservedMap.java Tue Sep 25 17:40:18 2012
@@ -21,9 +21,9 @@ package org.apache.giraph.comm.netty.han
import com.google.common.collect.MapMaker;
import java.util.concurrent.ConcurrentMap;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.utils.IncreasingBitSet;
import org.apache.giraph.comm.netty.NettyServer;
-import org.apache.giraph.graph.GiraphJob;
import org.apache.hadoop.conf.Configuration;
/**
@@ -41,7 +41,7 @@ public class WorkerRequestReservedMap {
*/
public WorkerRequestReservedMap(Configuration conf) {
workerRequestReservedMap = new MapMaker().concurrencyLevel(
- conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT)).makeMap();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java Tue Sep 25 17:40:18 2012
@@ -19,7 +19,6 @@
package org.apache.giraph.comm.requests;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.BspUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -46,7 +45,7 @@ import java.util.Map.Entry;
@SuppressWarnings("rawtypes")
public class SendPartitionMessagesRequest<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
- WritableRequest implements WorkerRequest<I, V, E, M> {
+ WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendPartitionMessagesRequest.class);
@@ -78,12 +77,12 @@ public class SendPartitionMessagesReques
int vertexIdMessagesSize = input.readInt();
vertexIdMessages = Maps.newHashMapWithExpectedSize(vertexIdMessagesSize);
for (int i = 0; i < vertexIdMessagesSize; ++i) {
- I vertexId = BspUtils.<I>createVertexId(getConf());
+ I vertexId = getConf().createVertexId();
vertexId.readFields(input);
int messageCount = input.readInt();
List<M> messageList = Lists.newArrayListWithCapacity(messageCount);
for (int j = 0; j < messageCount; ++j) {
- M message = BspUtils.<M>createMessageValue(getConf());
+ M message = getConf().createMessageValue();
message.readFields(input);
messageList.add(message);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java Tue Sep 25 17:40:18 2012
@@ -26,7 +26,6 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.VertexMutations;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -45,7 +44,7 @@ import com.google.common.collect.Maps;
@SuppressWarnings("rawtypes")
public class SendPartitionMutationsRequest<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
- WritableRequest implements WorkerRequest<I, V, E, M> {
+ WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendPartitionMutationsRequest.class);
@@ -78,7 +77,7 @@ public class SendPartitionMutationsReque
int vertexIdMutationsSize = input.readInt();
vertexIdMutations = Maps.newHashMapWithExpectedSize(vertexIdMutationsSize);
for (int i = 0; i < vertexIdMutationsSize; ++i) {
- I vertexId = BspUtils.<I>createVertexId(getConf());
+ I vertexId = getConf().createVertexId();
vertexId.readFields(input);
VertexMutations<I, V, E, M> vertexMutations =
new VertexMutations<I, V, E, M>();
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendVertexRequest.java Tue Sep 25 17:40:18 2012
@@ -19,7 +19,6 @@
package org.apache.giraph.comm.requests;
import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -43,7 +42,7 @@ import java.util.Collection;
@SuppressWarnings("rawtypes")
public class SendVertexRequest<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> extends
- WritableRequest implements WorkerRequest<I, V, E, M> {
+ WritableRequest<I, V, E, M> implements WorkerRequest<I, V, E, M> {
/** Class logger */
private static final Logger LOG =
Logger.getLogger(SendVertexRequest.class);
@@ -75,7 +74,7 @@ public class SendVertexRequest<I extends
int verticesCount = input.readInt();
vertices = Lists.newArrayListWithCapacity(verticesCount);
for (int i = 0; i < verticesCount; ++i) {
- Vertex<I, V, E, M> vertex = BspUtils.createVertex(getConf());
+ Vertex<I, V, E, M> vertex = getConf().createVertex();
vertex.readFields(input);
vertices.add(vertex);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java Tue Sep 25 17:40:18 2012
@@ -21,16 +21,25 @@ package org.apache.giraph.comm.requests;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* Interface for requests to implement
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
-public abstract class WritableRequest implements Writable, Configurable {
+public abstract class WritableRequest<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements Writable,
+ ImmutableClassesGiraphConfigurable<I, V, E, M> {
/** Configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Client id */
private int clientId = -1;
/** Request id */
@@ -74,12 +83,13 @@ public abstract class WritableRequest im
abstract void writeRequest(DataOutput output) throws IOException;
@Override
- public final Configuration getConf() {
+ public final ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
return conf;
}
@Override
- public final void setConf(Configuration conf) {
+ public final void setConf(ImmutableClassesGiraphConfiguration<I, V,
+ E, M> conf) {
this.conf = conf;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,9 @@
package org.apache.giraph.examples;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.BspInputSplit;
import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -59,7 +59,7 @@ public abstract class GeneratedVertexRea
/** Reverse the id order? */
protected boolean reverseIdOrder;
/** Saved configuration */
- protected Configuration configuration = null;
+ protected ImmutableClassesGiraphConfiguration configuration = null;
/**
* Default constructor for reflection.
@@ -70,7 +70,8 @@ public abstract class GeneratedVertexRea
@Override
public final void initialize(InputSplit inputSplit,
TaskAttemptContext context) throws IOException {
- configuration = context.getConfiguration();
+ configuration = new ImmutableClassesGiraphConfiguration(
+ context.getConfiguration());
totalRecords = configuration.getLong(
GeneratedVertexReader.READER_VERTICES,
GeneratedVertexReader.DEFAULT_READER_VERTICES);
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Tue Sep 25 17:40:18 2012
@@ -46,9 +46,7 @@ import org.apache.log4j.Logger;
* every iteration to verify that checkpoint restarting works. Fault injection
* can also test automated checkpoint restarts.
*/
-public class SimpleCheckpointVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
- implements Tool {
+public class SimpleCheckpointVertex implements Tool {
/** Which superstep to cause the worker to fail */
public static final int FAULTING_SUPERSTEP = 4;
/** Vertex id to fault on */
@@ -65,58 +63,64 @@ public class SimpleCheckpointVertex exte
/** Configuration */
private Configuration conf;
- @Override
- public void compute(Iterable<FloatWritable> messages) {
- SimpleCheckpointVertexWorkerContext workerContext =
- (SimpleCheckpointVertexWorkerContext) getWorkerContext();
-
- boolean enableFault = workerContext.getEnableFault();
- int supersteps = workerContext.getSupersteps();
-
- if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
- (getContext().getTaskAttemptID().getId() == 0) &&
- (getId().get() == FAULTING_VERTEX_ID)) {
- LOG.info("compute: Forced a fault on the first " +
- "attempt of superstep " +
- FAULTING_SUPERSTEP + " and vertex id " +
- FAULTING_VERTEX_ID);
- System.exit(-1);
- }
- if (getSuperstep() > supersteps) {
- voteToHalt();
- return;
- }
- long sumAgg = this.<LongWritable>getAggregatedValue(
- LongSumAggregator.class.getName()).get();
- LOG.info("compute: " + sumAgg);
- aggregate(LongSumAggregator.class.getName(),
- new LongWritable(getId().get()));
- LOG.info("compute: sum = " + sumAgg +
- " for vertex " + getId());
- float msgValue = 0.0f;
- for (FloatWritable message : messages) {
- float curMsgValue = message.get();
- msgValue += curMsgValue;
- LOG.info("compute: got msgValue = " + curMsgValue +
- " for vertex " + getId() +
- " on superstep " + getSuperstep());
- }
- int vertexValue = getValue().get();
- setValue(new IntWritable(vertexValue + (int) msgValue));
- LOG.info("compute: vertex " + getId() +
- " has value " + getValue() +
- " on superstep " + getSuperstep());
- for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
- FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
- (float) vertexValue);
+ /**
+ * Actual computation.
+ */
+ public static class SimpleCheckpointComputation extends
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable> {
+ @Override
+ public void compute(Iterable<FloatWritable> messages) {
+ SimpleCheckpointVertexWorkerContext workerContext =
+ (SimpleCheckpointVertexWorkerContext) getWorkerContext();
+
+ boolean enableFault = workerContext.getEnableFault();
+ int supersteps = workerContext.getSupersteps();
+
+ if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
+ (getContext().getTaskAttemptID().getId() == 0) &&
+ (getId().get() == FAULTING_VERTEX_ID)) {
+ LOG.info("compute: Forced a fault on the first " +
+ "attempt of superstep " +
+ FAULTING_SUPERSTEP + " and vertex id " +
+ FAULTING_VERTEX_ID);
+ System.exit(-1);
+ }
+ if (getSuperstep() > supersteps) {
+ voteToHalt();
+ return;
+ }
+ long sumAgg = this.<LongWritable>getAggregatedValue(
+ LongSumAggregator.class.getName()).get();
+ LOG.info("compute: " + sumAgg);
+ aggregate(LongSumAggregator.class.getName(),
+ new LongWritable(getId().get()));
+ LOG.info("compute: sum = " + sumAgg +
+ " for vertex " + getId());
+ float msgValue = 0.0f;
+ for (FloatWritable message : messages) {
+ float curMsgValue = message.get();
+ msgValue += curMsgValue;
+ LOG.info("compute: got msgValue = " + curMsgValue +
+ " for vertex " + getId() +
+ " on superstep " + getSuperstep());
+ }
+ int vertexValue = getValue().get();
+ setValue(new IntWritable(vertexValue + (int) msgValue));
LOG.info("compute: vertex " + getId() +
- " sending edgeValue " + edge.getValue() +
- " vertexValue " + vertexValue +
- " total " + newEdgeValue +
- " to vertex " + edge.getTargetVertexId() +
- " on superstep " + getSuperstep());
- addEdge(edge.getTargetVertexId(), newEdgeValue);
- sendMessage(edge.getTargetVertexId(), newEdgeValue);
+ " has value " + getValue() +
+ " on superstep " + getSuperstep());
+ for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+ FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+ (float) vertexValue);
+ LOG.info("compute: vertex " + getId() +
+ " sending edgeValue " + edge.getValue() +
+ " vertexValue " + vertexValue +
+ " total " + newEdgeValue +
+ " to vertex " + edge.getTargetVertexId() +
+ " on superstep " + getSuperstep());
+ addEdge(edge.getTargetVertexId(), newEdgeValue);
+ sendMessage(edge.getTargetVertexId(), newEdgeValue);
+ }
}
}
@@ -212,14 +216,19 @@ public class SimpleCheckpointVertex exte
}
GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
- bspJob.setVertexClass(getClass());
- bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
- bspJob.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
- bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
- bspJob.setMasterComputeClass(SimpleCheckpointVertexMasterCompute.class);
+ bspJob.getConfiguration().setVertexClass(SimpleCheckpointComputation.class);
+ bspJob.getConfiguration().setVertexInputFormatClass(
+ GeneratedVertexInputFormat.class);
+ bspJob.getConfiguration().setVertexOutputFormatClass(
+ IdWithValueTextOutputFormat.class);
+ bspJob.getConfiguration().setWorkerContextClass(
+ SimpleCheckpointVertexWorkerContext.class);
+ bspJob.getConfiguration().setMasterComputeClass(
+ SimpleCheckpointVertexMasterCompute.class);
int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
- bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
+ bspJob.getConfiguration().setWorkerConfiguration(
+ minWorkers, maxWorkers, 100.0f);
FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
new Path(cmd.getOptionValue('o')));
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Tue Sep 25 17:40:18 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.examples;
import org.apache.giraph.aggregators.DoubleMaxAggregator;
import org.apache.giraph.aggregators.DoubleMinAggregator;
import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.DefaultMasterCompute;
import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
import org.apache.giraph.graph.Vertex;
@@ -183,10 +182,9 @@ public class SimplePageRankVertex extend
@Override
public Vertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable>
- getCurrentVertex() throws IOException {
+ FloatWritable, DoubleWritable> getCurrentVertex() throws IOException {
Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- vertex = BspUtils.createVertex(configuration);
+ vertex = configuration.createVertex();
LongWritable vertexId = new LongWritable(
(inputSplit.getSplitIndex() * totalRecords) + recordsRead);
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Tue Sep 25 17:40:18 2012
@@ -18,7 +18,6 @@
package org.apache.giraph.examples;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
@@ -70,8 +69,7 @@ public class SimpleSuperstepVertex exten
IntWritable> getCurrentVertex()
throws IOException, InterruptedException {
Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
- BspUtils.<LongWritable, IntWritable, FloatWritable,
- IntWritable>createVertex(configuration);
+ configuration.createVertex();
long tmpId = reverseIdOrder ?
((inputSplit.getSplitIndex() + 1) * totalRecords) -
recordsRead - 1 :
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Tue Sep 25 17:40:18 2012
@@ -42,26 +42,43 @@ import java.io.IOException;
* emit worker data to HDFS during a graph
* computation.
*/
-public class SimpleVertexWithWorkerContext extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, DoubleWritable>
- implements Tool {
+public class SimpleVertexWithWorkerContext implements Tool {
/** Directory name of where to write. */
public static final String OUTPUTDIR = "svwwc.outputdir";
/** Halting condition for the number of supersteps */
private static final int TESTLENGTH = 30;
+ /** Configuration */
+ private Configuration conf;
@Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
- long superstep = getSuperstep();
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
- if (superstep < TESTLENGTH) {
- EmitterWorkerContext emitter =
- (EmitterWorkerContext) getWorkerContext();
- emitter.emit("vertexId=" + getId() +
- " superstep=" + superstep + "\n");
- } else {
- voteToHalt();
+ /**
+ * Actual vetex implementation
+ */
+ public static class SimpleVertex extends
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable,
+ DoubleWritable> {
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+
+ long superstep = getSuperstep();
+
+ if (superstep < TESTLENGTH) {
+ EmitterWorkerContext emitter =
+ (EmitterWorkerContext) getWorkerContext();
+ emitter.emit("vertexId=" + getId() +
+ " superstep=" + superstep + "\n");
+ } else {
+ voteToHalt();
+ }
}
}
@@ -152,13 +169,13 @@ public class SimpleVertexWithWorkerConte
"run: Must have 2 arguments <output path> <# of workers>");
}
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.setVertexClass(getClass());
- job.setVertexInputFormatClass(
+ job.getConfiguration().setVertexClass(SimpleVertex.class);
+ job.getConfiguration().setVertexInputFormatClass(
SimpleSuperstepVertexInputFormat.class);
- job.setWorkerContextClass(EmitterWorkerContext.class);
- Configuration conf = job.getConfiguration();
- conf.set(SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
- job.setWorkerConfiguration(Integer.parseInt(args[1]),
+ job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);
+ job.getConfiguration().set(
+ SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
+ job.getConfiguration().setWorkerConfiguration(Integer.parseInt(args[1]),
Integer.parseInt(args[1]),
100.0f);
if (job.run(true)) {
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Tue Sep 25 17:40:18 2012
@@ -18,13 +18,14 @@
package org.apache.giraph.graph;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedService;
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.zk.ZooKeeperManager;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
@@ -239,8 +240,8 @@ public abstract class BspService<I exten
/** Registered list of BspEvents */
private final List<BspEvent> registeredBspEvents =
new ArrayList<BspEvent>();
- /** Configuration of the job*/
- private final Configuration conf;
+ /** Immutable configuration of the job*/
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Job context (mainly for progress) */
private final Mapper<?, ?, ?, ?>.Context context;
/** Cached superstep (from ZooKeeper) */
@@ -305,10 +306,12 @@ public abstract class BspService<I exten
this.context = context;
this.graphMapper = graphMapper;
- this.conf = context.getConfiguration();
+ this.conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ context.getConfiguration());
this.jobId = conf.get("mapred.job.id", "Unknown Job");
this.taskPartition = conf.getInt("mapred.task.partition", -1);
- this.restartedSuperstep = conf.getLong(GiraphJob.RESTART_SUPERSTEP,
+ this.restartedSuperstep = conf.getLong(
+ GiraphConfiguration.RESTART_SUPERSTEP,
UNSET_SUPERSTEP);
this.cachedSuperstep = restartedSuperstep;
if ((restartedSuperstep != UNSET_SUPERSTEP) &&
@@ -323,12 +326,11 @@ public abstract class BspService<I exten
throw new RuntimeException(e);
}
this.hostnamePartitionId = hostname + "_" + getTaskPartition();
- this.graphPartitionerFactory =
- BspUtils.<I, V, E, M>createGraphPartitioner(conf);
+ this.graphPartitionerFactory = conf.createGraphPartitioner();
this.checkpointFrequency =
- conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY,
- GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT);
+ conf.getInt(GiraphConfiguration.CHECKPOINT_FREQUENCY,
+ GiraphConfiguration.CHECKPOINT_FREQUENCY_DEFAULT);
basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
@@ -338,10 +340,9 @@ public abstract class BspService<I exten
inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
- checkpointBasePath =
- getConfiguration().get(
- GiraphJob.CHECKPOINT_DIRECTORY,
- GiraphJob.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
+ checkpointBasePath = getConfiguration().get(
+ GiraphConfiguration.CHECKPOINT_DIRECTORY,
+ GiraphConfiguration.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
currentMasterPath = basePath + CURRENT_MASTER_INFO_NODE;
if (LOG.isInfoEnabled()) {
@@ -621,7 +622,8 @@ public abstract class BspService<I exten
return fs;
}
- public final Configuration getConfiguration() {
+ public final ImmutableClassesGiraphConfiguration<I, V, E, M>
+ getConfiguration() {
return conf;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Sep 25 17:40:18 2012
@@ -19,6 +19,7 @@
package org.apache.giraph.graph;
import com.google.common.collect.Sets;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.CentralizedServiceMaster;
@@ -168,21 +169,21 @@ public class BspServiceMaster<I extends
registerBspEvent(superstepStateChanged);
maxWorkers =
- getConfiguration().getInt(GiraphJob.MAX_WORKERS, -1);
+ getConfiguration().getInt(GiraphConfiguration.MAX_WORKERS, -1);
minWorkers =
- getConfiguration().getInt(GiraphJob.MIN_WORKERS, -1);
+ getConfiguration().getInt(GiraphConfiguration.MIN_WORKERS, -1);
minPercentResponded =
- getConfiguration().getFloat(GiraphJob.MIN_PERCENT_RESPONDED,
+ getConfiguration().getFloat(GiraphConfiguration.MIN_PERCENT_RESPONDED,
100.0f);
msecsPollPeriod =
- getConfiguration().getInt(GiraphJob.POLL_MSECS,
- GiraphJob.POLL_MSECS_DEFAULT);
+ getConfiguration().getInt(GiraphConfiguration.POLL_MSECS,
+ GiraphConfiguration.POLL_MSECS_DEFAULT);
maxPollAttempts =
- getConfiguration().getInt(GiraphJob.POLL_ATTEMPTS,
- GiraphJob.POLL_ATTEMPTS_DEFAULT);
+ getConfiguration().getInt(GiraphConfiguration.POLL_ATTEMPTS,
+ GiraphConfiguration.POLL_ATTEMPTS_DEFAULT);
partitionLongTailMinPrint = getConfiguration().getInt(
- GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT,
- GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+ GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT,
+ GiraphConfiguration.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
masterGraphPartitioner =
getGraphPartitionerFactory().createMasterGraphPartitioner();
}
@@ -242,15 +243,16 @@ public class BspServiceMaster<I extends
*/
private List<InputSplit> generateInputSplits(int numWorkers) {
VertexInputFormat<I, V, E, M> vertexInputFormat =
- BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+ getConfiguration().createVertexInputFormat();
List<InputSplit> splits;
try {
splits = vertexInputFormat.getSplits(getContext(), numWorkers);
float samplePercent =
getConfiguration().getFloat(
- GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT,
- GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
- if (samplePercent != GiraphJob.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
+ GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT,
+ GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT);
+ if (samplePercent !=
+ GiraphConfiguration.INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT) {
int lastIndex = (int) (samplePercent * splits.size() / 100f);
List<InputSplit> sampleSplits = splits.subList(0, lastIndex);
LOG.warn("generateInputSplits: Using sampling - Processing " +
@@ -285,7 +287,7 @@ public class BspServiceMaster<I extends
org.apache.hadoop.mapred.JobClient jobClient =
new org.apache.hadoop.mapred.JobClient(
(org.apache.hadoop.mapred.JobConf)
- getConfiguration());
+ getContext().getConfiguration());
@SuppressWarnings("deprecation")
org.apache.hadoop.mapred.JobID jobId =
org.apache.hadoop.mapred.JobID.forName(getJobId());
@@ -785,10 +787,8 @@ public class BspServiceMaster<I extends
currentMasterTaskPartitionCounter.increment(
getTaskPartition() -
currentMasterTaskPartitionCounter.getValue());
- masterCompute =
- BspUtils.createMasterCompute(getConfiguration());
- aggregatorWriter =
- BspUtils.createAggregatorWriter(getConfiguration());
+ masterCompute = getConfiguration().createMasterCompute();
+ aggregatorWriter = getConfiguration().createAggregatorWriter();
try {
aggregatorWriter.initialize(getContext(),
getApplicationAttempt());
@@ -797,10 +797,9 @@ public class BspServiceMaster<I extends
"Couldn't initialize aggregatorWriter", e);
}
- boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
- if (useNetty) {
- commService = new NettyMasterClientServer(getContext());
+ if (getConfiguration().getUseNetty()) {
+ commService = new NettyMasterClientServer(
+ getContext(), getConfiguration());
masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
commService.getMyAddress().getPort());
// write my address to znode so workers could read it
@@ -1415,8 +1414,8 @@ public class BspServiceMaster<I extends
private void cleanUpOldSuperstep(long removeableSuperstep) throws
InterruptedException {
if (!(getConfiguration().getBoolean(
- GiraphJob.KEEP_ZOOKEEPER_DATA,
- GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
+ GiraphConfiguration.KEEP_ZOOKEEPER_DATA,
+ GiraphConfiguration.KEEP_ZOOKEEPER_DATA_DEFAULT)) &&
(removeableSuperstep >= 0)) {
String oldSuperstepPath =
getSuperstepPath(getApplicationAttempt()) + "/" +
@@ -1470,9 +1469,7 @@ public class BspServiceMaster<I extends
}
}
- boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
- if (useNetty) {
+ if (getConfiguration().getUseNetty()) {
commService.fixWorkerAddresses(chosenWorkerInfoList);
}
@@ -1702,8 +1699,8 @@ public class BspServiceMaster<I extends
// and the master can do any final cleanup
try {
if (!getConfiguration().getBoolean(
- GiraphJob.KEEP_ZOOKEEPER_DATA,
- GiraphJob.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
+ GiraphConfiguration.KEEP_ZOOKEEPER_DATA,
+ GiraphConfiguration.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
if (LOG.isInfoEnabled()) {
LOG.info("cleanupZooKeeper: Removing the following path " +
"and all children - " + basePath);
@@ -1753,8 +1750,8 @@ public class BspServiceMaster<I extends
cleanUpZooKeeper();
// If desired, cleanup the checkpoint directory
if (getConfiguration().getBoolean(
- GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
- GiraphJob.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
+ GiraphConfiguration.CLEANUP_CHECKPOINTS_AFTER_SUCCESS,
+ GiraphConfiguration.CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT)) {
boolean success =
getFs().delete(new Path(checkpointBasePath), true);
if (LOG.isInfoEnabled()) {
@@ -1765,9 +1762,7 @@ public class BspServiceMaster<I extends
}
aggregatorWriter.close();
- boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
- if (useNetty) {
+ if (getConfiguration().getUseNetty()) {
commService.closeConnections();
commService.close();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.RPCCommunications;
@@ -148,17 +149,19 @@ public class BspServiceWorker<I extends
new GiraphTransferRegulator(getConfiguration());
inputSplitMaxVertices =
getConfiguration().getLong(
- GiraphJob.INPUT_SPLIT_MAX_VERTICES,
- GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+ GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES,
+ GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
- boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
+ boolean useNetty = getConfiguration().getUseNetty();
if (useNetty) {
- commService = new NettyWorkerClientServer<I, V, E, M>(context, this);
+ commService = new NettyWorkerClientServer<I, V, E, M>(
+ context, getConfiguration(), this);
} else {
commService =
- new RPCCommunications<I, V, E, M>(context, this, graphState);
+ new RPCCommunications<I, V, E, M>(context, this,
+ getConfiguration(),
+ graphState);
}
if (LOG.isInfoEnabled()) {
LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
@@ -173,8 +176,7 @@ public class BspServiceWorker<I extends
graphState.setWorkerCommunications(commService);
this.workerContext =
- BspUtils.createWorkerContext(getConfiguration(),
- graphMapper.getGraphState());
+ getConfiguration().createWorkerContext(graphMapper.getGraphState());
if (useNetty) {
workerPartitionStore = null;
@@ -445,7 +447,7 @@ public class BspServiceWorker<I extends
private VertexEdgeCount readVerticesFromInputSplit(
InputSplit inputSplit) throws IOException, InterruptedException {
VertexInputFormat<I, V, E, M> vertexInputFormat =
- BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+ getConfiguration().createVertexInputFormat();
VertexReader<I, V, E, M> vertexReader =
vertexInputFormat.createVertexReader(inputSplit, getContext());
vertexReader.initialize(inputSplit, getContext());
@@ -459,8 +461,7 @@ public class BspServiceWorker<I extends
"without an id! - " + readerVertex);
}
if (readerVertex.getValue() == null) {
- readerVertex.setValue(
- BspUtils.<V>createVertexValue(getConfiguration()));
+ readerVertex.setValue(getConfiguration().createVertexValue());
}
PartitionOwner partitionOwner =
workerGraphPartitioner.getPartitionOwner(
@@ -947,9 +948,7 @@ public class BspServiceWorker<I extends
}
- boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
- if (useNetty) {
+ if (getConfiguration().getUseNetty()) {
// get address of master
WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
null, masterInfo);
@@ -1097,15 +1096,15 @@ public class BspServiceWorker<I extends
* @throws InterruptedException
*/
private void saveVertices() throws IOException, InterruptedException {
- if (getConfiguration().get(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS) ==
- null) {
- LOG.warn("saveVertices: " + GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS +
+ if (getConfiguration().getVertexOutputFormatClass() == null) {
+ LOG.warn("saveVertices: " +
+ GiraphConfiguration.VERTEX_OUTPUT_FORMAT_CLASS +
" not specified -- there will be no saved output");
return;
}
VertexOutputFormat<I, V, E> vertexOutputFormat =
- BspUtils.<I, V, E>createVertexOutputFormat(getConfiguration());
+ getConfiguration().createVertexOutputFormat();
VertexWriter<I, V, E> vertexWriter =
vertexOutputFormat.createVertexWriter(getContext());
vertexWriter.initialize(getContext());
@@ -1203,8 +1202,7 @@ public class BspServiceWorker<I extends
LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
}
- boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
+ boolean useNetty = getConfiguration().getUseNetty();
FSDataOutputStream verticesOutputStream =
getFs().create(verticesFilePath);
ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
@@ -1272,8 +1270,8 @@ public class BspServiceWorker<I extends
@Override
public void loadCheckpoint(long superstep) {
- if (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT)) {
+ if (getConfiguration().getBoolean(GiraphConfiguration.USE_NETTY,
+ GiraphConfiguration.USE_NETTY_DEFAULT)) {
try {
// clear old message stores
getServerData().getIncomingMessageStore().clearAll();
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.giraph.graph.partition.HashPartitionerFactory;
import org.apache.giraph.graph.partition.PartitionStats;
@@ -53,7 +54,7 @@ public class BspUtils {
Class<? extends GraphPartitionerFactory<I, V, E, M>>
getGraphPartitionerClass(Configuration conf) {
return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
- conf.getClass(GiraphJob.GRAPH_PARTITIONER_FACTORY_CLASS,
+ conf.getClass(GiraphConfiguration.GRAPH_PARTITIONER_FACTORY_CLASS,
HashPartitionerFactory.class,
GraphPartitionerFactory.class);
}
@@ -116,7 +117,7 @@ public class BspUtils {
Class<? extends VertexInputFormat<I, V, E, M>>
getVertexInputFormatClass(Configuration conf) {
return (Class<? extends VertexInputFormat<I, V, E, M>>)
- conf.getClass(GiraphJob.VERTEX_INPUT_FORMAT_CLASS,
+ conf.getClass(GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS,
null,
VertexInputFormat.class);
}
@@ -160,7 +161,7 @@ public class BspUtils {
Class<? extends VertexOutputFormat<I, V, E>>
getVertexOutputFormatClass(Configuration conf) {
return (Class<? extends VertexOutputFormat<I, V, E>>)
- conf.getClass(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS,
+ conf.getClass(GiraphConfiguration.VERTEX_OUTPUT_FORMAT_CLASS,
null,
VertexOutputFormat.class);
}
@@ -191,7 +192,7 @@ public class BspUtils {
*/
public static Class<? extends AggregatorWriter>
getAggregatorWriterClass(Configuration conf) {
- return conf.getClass(GiraphJob.AGGREGATOR_WRITER_CLASS,
+ return conf.getClass(GiraphConfiguration.AGGREGATOR_WRITER_CLASS,
TextAggregatorWriter.class,
AggregatorWriter.class);
}
@@ -221,7 +222,7 @@ public class BspUtils {
Class<? extends VertexCombiner<I, M>>
getVertexCombinerClass(Configuration conf) {
return (Class<? extends VertexCombiner<I, M>>)
- conf.getClass(GiraphJob.VERTEX_COMBINER_CLASS,
+ conf.getClass(GiraphConfiguration.VERTEX_COMBINER_CLASS,
null,
VertexCombiner.class);
}
@@ -259,7 +260,7 @@ public class BspUtils {
Class<? extends VertexResolver<I, V, E, M>>
getVertexResolverClass(Configuration conf) {
return (Class<? extends VertexResolver<I, V, E, M>>)
- conf.getClass(GiraphJob.VERTEX_RESOLVER_CLASS,
+ conf.getClass(GiraphConfiguration.VERTEX_RESOLVER_CLASS,
VertexResolver.class,
VertexResolver.class);
}
@@ -297,7 +298,7 @@ public class BspUtils {
public static Class<? extends WorkerContext>
getWorkerContextClass(Configuration conf) {
return (Class<? extends WorkerContext>)
- conf.getClass(GiraphJob.WORKER_CONTEXT_CLASS,
+ conf.getClass(GiraphConfiguration.WORKER_CONTEXT_CLASS,
DefaultWorkerContext.class,
WorkerContext.class);
}
@@ -335,7 +336,7 @@ public class BspUtils {
public static Class<? extends MasterCompute>
getMasterComputeClass(Configuration conf) {
return (Class<? extends MasterCompute>)
- conf.getClass(GiraphJob.MASTER_COMPUTE_CLASS,
+ conf.getClass(GiraphConfiguration.MASTER_COMPUTE_CLASS,
DefaultMasterCompute.class,
MasterCompute.class);
}
@@ -370,7 +371,7 @@ public class BspUtils {
E extends Writable, M extends Writable>
Class<? extends Vertex<I, V, E, M>> getVertexClass(Configuration conf) {
return (Class<? extends Vertex<I, V, E, M>>)
- conf.getClass(GiraphJob.VERTEX_CLASS,
+ conf.getClass(GiraphConfiguration.VERTEX_CLASS,
null,
Vertex.class);
}
@@ -405,7 +406,7 @@ public class BspUtils {
@SuppressWarnings("unchecked")
public static <I extends Writable> Class<I>
getVertexIdClass(Configuration conf) {
- return (Class<I>) conf.getClass(GiraphJob.VERTEX_ID_CLASS,
+ return (Class<I>) conf.getClass(GiraphConfiguration.VERTEX_ID_CLASS,
WritableComparable.class);
}
@@ -441,7 +442,7 @@ public class BspUtils {
@SuppressWarnings("unchecked")
public static <V extends Writable> Class<V>
getVertexValueClass(Configuration conf) {
- return (Class<V>) conf.getClass(GiraphJob.VERTEX_VALUE_CLASS,
+ return (Class<V>) conf.getClass(GiraphConfiguration.VERTEX_VALUE_CLASS,
Writable.class);
}
@@ -481,7 +482,7 @@ public class BspUtils {
@SuppressWarnings("unchecked")
public static <E extends Writable> Class<E>
getEdgeValueClass(Configuration conf) {
- return (Class<E>) conf.getClass(GiraphJob.EDGE_VALUE_CLASS,
+ return (Class<E>) conf.getClass(GiraphConfiguration.EDGE_VALUE_CLASS,
Writable.class);
}
@@ -521,7 +522,7 @@ public class BspUtils {
@SuppressWarnings("unchecked")
public static <M extends Writable> Class<M>
getMessageValueClass(Configuration conf) {
- return (Class<M>) conf.getClass(GiraphJob.MESSAGE_VALUE_CLASS,
+ return (Class<M>) conf.getClass(GiraphConfiguration.MESSAGE_VALUE_CLASS,
Writable.class);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Tue Sep 25 17:40:18 2012
@@ -121,18 +121,18 @@ public abstract class EdgeListVertex<I e
@Override
public final void readFields(DataInput in) throws IOException {
- I vertexId = BspUtils.<I>createVertexId(getConf());
+ I vertexId = getConf().createVertexId();
vertexId.readFields(in);
- V vertexValue = BspUtils.<V>createVertexValue(getConf());
+ V vertexValue = getConf().createVertexValue();
vertexValue.readFields(in);
super.initialize(vertexId, vertexValue);
int numEdges = in.readInt();
edgeList = Lists.newArrayListWithCapacity(numEdges);
for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = BspUtils.<I>createVertexId(getConf());
+ I targetVertexId = getConf().createVertexId();
targetVertexId.readFields(in);
- E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+ E edgeValue = getConf().createEdgeValue();
edgeValue.readFields(in);
edgeList.add(new Edge<I, E>(targetVertexId, edgeValue));
}
@@ -140,7 +140,7 @@ public abstract class EdgeListVertex<I e
int numMessages = in.readInt();
messageList = Lists.newArrayListWithCapacity(numMessages);
for (int i = 0; i < numMessages; ++i) {
- M message = BspUtils.<M>createMessageValue(getConf());
+ M message = getConf().createMessageValue();
message.readFields(in);
messageList.add(message);
}