You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/25 19:40:21 UTC
svn commit: r1390014 [3/4] - in /giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/comm/messages/ src/...
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Sep 25 17:40:18 2012
@@ -18,9 +18,10 @@
package org.apache.giraph.graph;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.BspOutputFormat;
-import org.apache.giraph.graph.partition.GraphPartitionerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.mapreduce.Job;
@@ -38,459 +39,38 @@ public class GiraphJob {
Configuration.addDefaultResource("giraph-site.xml");
}
- /** Vertex class - required */
- public static final String VERTEX_CLASS = "giraph.vertexClass";
- /** VertexInputFormat class - required */
- public static final String VERTEX_INPUT_FORMAT_CLASS =
- "giraph.vertexInputFormatClass";
-
- /** Class for Master - optional */
- public static final String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
-
- /** VertexOutputFormat class - optional */
- public static final String VERTEX_OUTPUT_FORMAT_CLASS =
- "giraph.vertexOutputFormatClass";
- /** Vertex combiner class - optional */
- public static final String VERTEX_COMBINER_CLASS =
- "giraph.combinerClass";
- /** Vertex resolver class - optional */
- public static final String VERTEX_RESOLVER_CLASS =
- "giraph.vertexResolverClass";
- /** Graph partitioner factory class - optional */
- public static final String GRAPH_PARTITIONER_FACTORY_CLASS =
- "giraph.graphPartitionerFactoryClass";
-
- /** Vertex index class */
- public static final String VERTEX_ID_CLASS = "giraph.vertexIdClass";
- /** Vertex value class */
- public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
- /** Edge value class */
- public static final String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
- /** Message value class */
- public static final String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
- /** Worker context class */
- public static final String WORKER_CONTEXT_CLASS =
- "giraph.workerContextClass";
- /** AggregatorWriter class - optional */
- public static final String AGGREGATOR_WRITER_CLASS =
- "giraph.aggregatorWriterClass";
-
- /**
- * Minimum number of simultaneous workers before this job can run (int)
- */
- public static final String MIN_WORKERS = "giraph.minWorkers";
- /**
- * Maximum number of simultaneous worker tasks started by this job (int).
- */
- public static final String MAX_WORKERS = "giraph.maxWorkers";
-
- /**
- * Separate the workers and the master tasks. This is required
- * to support dynamic recovery. (boolean)
- */
- public static final String SPLIT_MASTER_WORKER =
- "giraph.SplitMasterWorker";
- /**
- * Default on whether to separate the workers and the master tasks.
- * Needs to be "true" to support dynamic recovery.
- */
- public static final boolean SPLIT_MASTER_WORKER_DEFAULT = true;
-
- /** Indicates whether this job is run in an internal unit test */
- public static final String LOCAL_TEST_MODE =
- "giraph.localTestMode";
-
- /** not in local test mode per default */
- public static final boolean LOCAL_TEST_MODE_DEFAULT = false;
-
- /** Override the Hadoop log level and set the desired log level. */
- public static final String LOG_LEVEL = "giraph.logLevel";
- /** Default log level is INFO (same as Hadoop) */
- public static final String LOG_LEVEL_DEFAULT = "info";
-
- /**
- * Minimum percent of the maximum number of workers that have responded
- * in order to continue progressing. (float)
- */
- public static final String MIN_PERCENT_RESPONDED =
- "giraph.minPercentResponded";
- /** Default 100% response rate for workers */
- public static final float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
-
- /** Polling timeout to check on the number of responded tasks (int) */
- public static final String POLL_MSECS = "giraph.pollMsecs";
- /** Default poll msecs (30 seconds) */
- public static final int POLL_MSECS_DEFAULT = 30 * 1000;
-
- /**
- * ZooKeeper comma-separated list (if not set,
- * will start up ZooKeeper locally)
- */
- public static final String ZOOKEEPER_LIST = "giraph.zkList";
-
- /** ZooKeeper session millisecond timeout */
- public static final String ZOOKEEPER_SESSION_TIMEOUT =
- "giraph.zkSessionMsecTimeout";
- /** Default Zookeeper session millisecond timeout */
- public static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000;
-
- /** Polling interval to check for the final ZooKeeper server data */
- public static final String ZOOKEEPER_SERVERLIST_POLL_MSECS =
- "giraph.zkServerlistPollMsecs";
- /** Default polling interval to check for the final ZooKeeper server data */
- public static final int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT =
- 3 * 1000;
-
- /** Number of nodes (not tasks) to run Zookeeper on */
- public static final String ZOOKEEPER_SERVER_COUNT =
- "giraph.zkServerCount";
- /** Default number of nodes to run Zookeeper on */
- public static final int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
-
- /** ZooKeeper port to use */
- public static final String ZOOKEEPER_SERVER_PORT =
- "giraph.zkServerPort";
- /** Default ZooKeeper port to use */
- public static final int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
-
- /** Location of the ZooKeeper jar - Used internally, not meant for users */
- public static final String ZOOKEEPER_JAR = "giraph.zkJar";
-
- /** Local ZooKeeper directory to use */
- public static final String ZOOKEEPER_DIR = "giraph.zkDir";
-
- /** Use the RPC communication or netty communication */
- public static final String USE_NETTY = "giraph.useNetty";
- /** Default is to use RPC, not netty */
- public static final boolean USE_NETTY_DEFAULT = false;
-
- /** TCP backlog (defaults to number of workers) */
- public static final String TCP_BACKLOG = "giraph.tcpBacklog";
- /**
- * Default TCP backlog default if the number of workers is not specified
- * (i.e unittests)
- */
- public static final int TCP_BACKLOG_DEFAULT = 1;
-
- /** Netty simulate a first request closed */
- public static final String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
- "giraph.nettySimulateFirstRequestClosed";
- /** Default of not simulating failure for first request */
- public static final boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT =
- false;
-
- /** Netty simulate a first response failed */
- public static final String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
- "giraph.nettySimulateFirstResponseFailed";
- /** Default of not simulating failure for first reponse */
- public static final boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT =
- false;
-
- /** Maximum number of reconnection attempts */
- public static final String MAX_RECONNECT_ATTEMPTS =
- "giraph.maxNumberOfOpenRequests";
- /** Default maximum number of reconnection attempts */
- public static final int MAX_RECONNECT_ATTEMPTS_DEFAULT = 10;
-
- /** Max resolve address attempts */
- public static final String MAX_RESOLVE_ADDRESS_ATTEMPTS =
- "giraph.maxResolveAddressAttempts";
- /** Default max resolve address attempts */
- public static final int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
-
- /** Msecs to wait between waiting for all requests to finish */
- public static final String WAITING_REQUEST_MSECS =
- "giraph.waitingRequestMsecs";
- /** Default msecs to wait between waiting for all requests to finish */
- public static final int WAITING_REQUEST_MSECS_DEFAULT = 15000;
-
- /** Milliseconds for a request to complete (or else resend) */
- public static final String MAX_REQUEST_MILLISECONDS =
- "giraph.maxRequestMilliseconds";
- /** Maximum number of milliseconds for a request to complete (10 minutes) */
- public static final int MAX_REQUEST_MILLISECONDS_DEFAULT = 10 * 60 * 1000;
-
- /** Netty max connection failures */
- public static final String NETTY_MAX_CONNECTION_FAILURES =
- "giraph.nettyMaxConnectionFailures";
- /** Default Netty max connection failures */
- public static final int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
-
- /** Initial port to start using for the RPC communication */
- public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
- /** Default port to start using for the RPC communication */
- public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
-
- /** Maximum bind attempts for different RPC ports */
- public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
- "giraph.maxRpcPortBindAttempts";
- /** Default maximum bind attempts for different RPC ports */
- public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
- /**
- * Fail first RPC port binding attempt, simulate binding failure
- * on real grid testing
- */
- public static final String FAIL_FIRST_RPC_PORT_BIND_ATTEMPT =
- "giraph.failFirstRpcPortBindAttempt";
- /** Default fail first RPC port binding attempt flag */
- public static final boolean FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT = false;
-
- /** Maximum number of RPC handlers */
- public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
- /** Default maximum number of RPC handlers */
- public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
-
- /** Client send buffer size */
- public static final String CLIENT_SEND_BUFFER_SIZE =
- "giraph.clientSendBufferSize";
- /** Default client send buffer size of 0.5 MB */
- public static final int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
-
- /** Client receive buffer size */
- public static final String CLIENT_RECEIVE_BUFFER_SIZE =
- "giraph.clientReceiveBufferSize";
- /** Default client receive buffer size of 32 k */
- public static final int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
-
- /** Server send buffer size */
- public static final String SERVER_SEND_BUFFER_SIZE =
- "giraph.serverSendBufferSize";
- /** Default server send buffer size of 32 k */
- public static final int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
-
- /** Server receive buffer size */
- public static final String SERVER_RECEIVE_BUFFER_SIZE =
- "giraph.serverReceiveBufferSize";
- /** Default server receive buffer size of 0.5 MB */
- public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
-
- /** Maximum number of messages per peer before flush */
- public static final String MSG_SIZE = "giraph.msgSize";
- /** Default maximum number of messages per peer before flush */
- public static final int MSG_SIZE_DEFAULT = 2000;
-
- /** Maximum number of mutations per partition before flush */
- public static final String MAX_MUTATIONS_PER_REQUEST =
- "giraph.maxMutationsPerRequest";
- /** Default maximum number of mutations per partition before flush */
- public static final int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
-
- /** Maximum number of messages that can be bulk sent during a flush */
- public static final String MAX_MESSAGES_PER_FLUSH_PUT =
- "giraph.maxMessagesPerFlushPut";
- /** Default number of messages that can be bulk sent during a flush */
- public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
-
- /** Number of channels used per server */
- public static final String CHANNELS_PER_SERVER =
- "giraph.channelsPerServer";
- /** Default number of channels used per server of 1 */
- public static final int DEFAULT_CHANNELS_PER_SERVER = 1;
-
- /** Number of flush threads per peer */
- public static final String MSG_NUM_FLUSH_THREADS =
- "giraph.msgNumFlushThreads";
-
- /** Number of poll attempts prior to failing the job (int) */
- public static final String POLL_ATTEMPTS = "giraph.pollAttempts";
- /** Default poll attempts */
- public static final int POLL_ATTEMPTS_DEFAULT = 10;
-
- /** Number of minimum vertices in each vertex range */
- public static final String MIN_VERTICES_PER_RANGE =
- "giraph.minVerticesPerRange";
- /** Default number of minimum vertices in each vertex range */
- public static final long MIN_VERTICES_PER_RANGE_DEFAULT = 3;
-
- /** Minimum stragglers of the superstep before printing them out */
- public static final String PARTITION_LONG_TAIL_MIN_PRINT =
- "giraph.partitionLongTailMinPrint";
- /** Only print stragglers with one as a default */
- public static final int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
-
- /** Use superstep counters? (boolean) */
- public static final String USE_SUPERSTEP_COUNTERS =
- "giraph.useSuperstepCounters";
- /** Default is to use the superstep counters */
- public static final boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
-
- /**
- * Set the multiplicative factor of how many partitions to create from
- * a single InputSplit based on the number of total InputSplits. For
- * example, if there are 10 total InputSplits and this is set to 0.5, then
- * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the
- * minimum size is met).
- */
- public static final String TOTAL_INPUT_SPLIT_MULTIPLIER =
- "giraph.totalInputSplitMultiplier";
- /** Default total input split multiplier */
- public static final float TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT = 0.5f;
-
- /**
- * Input split sample percent - Used only for sampling and testing, rather
- * than an actual job. The idea is that to test, you might only want a
- * fraction of the actual input splits from your VertexInputFormat to
- * load (values should be [0, 100]).
- */
- public static final String INPUT_SPLIT_SAMPLE_PERCENT =
- "giraph.inputSplitSamplePercent";
- /** Default is to use all the input splits */
- public static final float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
-
- /**
- * To limit outlier input splits from producing too many vertices or to
- * help with testing, the number of vertices loaded from an input split can
- * be limited. By default, everything is loaded.
- */
- public static final String INPUT_SPLIT_MAX_VERTICES =
- "giraph.InputSplitMaxVertices";
- /**
- * Default is that all the vertices are to be loaded from the input
- * split
- */
- public static final long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
-
- /** Java opts passed to ZooKeeper startup */
- public static final String ZOOKEEPER_JAVA_OPTS =
- "giraph.zkJavaOpts";
- /** Default java opts passed to ZooKeeper startup */
- public static final String ZOOKEEPER_JAVA_OPTS_DEFAULT =
- "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
- "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
-
- /**
- * How often to checkpoint (i.e. 0, means no checkpoint,
- * 1 means every superstep, 2 is every two supersteps, etc.).
- */
- public static final String CHECKPOINT_FREQUENCY =
- "giraph.checkpointFrequency";
-
- /** Default checkpointing frequency of none. */
- public static final int CHECKPOINT_FREQUENCY_DEFAULT = 0;
-
- /**
- * Delete checkpoints after a successful job run?
- */
- public static final String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
- "giraph.cleanupCheckpointsAfterSuccess";
- /** Default is to clean up the checkponts after a successful job */
- public static final boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT =
- true;
-
- /**
- * An application can be restarted manually by selecting a superstep. The
- * corresponding checkpoint must exist for this to work. The user should
- * set a long value. Default is start from scratch.
- */
- public static final String RESTART_SUPERSTEP = "giraph.restartSuperstep";
-
- /**
- * Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root
- * znode on the cluster beginning with "/"
- */
- public static final String BASE_ZNODE_KEY = "giraph.zkBaseZNode";
-
- /**
- * If ZOOKEEPER_LIST is not set, then use this directory to manage
- * ZooKeeper
- */
- public static final String ZOOKEEPER_MANAGER_DIRECTORY =
- "giraph.zkManagerDirectory";
- /**
- * Default ZooKeeper manager directory (where determining the servers in
- * HDFS files will go). Final directory path will also have job number
- * for uniqueness.
- */
- public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT =
- "_bsp/_defaultZkManagerDir";
-
- /** This directory has/stores the available checkpoint files in HDFS. */
- public static final String CHECKPOINT_DIRECTORY =
- "giraph.checkpointDirectory";
- /**
- * Default checkpoint directory (where checkpoing files go in HDFS). Final
- * directory path will also have the job number for uniqueness
- */
- public static final String CHECKPOINT_DIRECTORY_DEFAULT =
- "_bsp/_checkpoints/";
-
- /** Directory in the local file system for out-of-core messages. */
- public static final String MESSAGES_DIRECTORY = "giraph.messagesDirectory";
- /**
- * Default messages directory. Final directory path will also have the
- * job number for uniqueness
- */
- public static final String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/";
-
- /** Whether or not to use out-of-core messages */
- public static final String USE_OUT_OF_CORE_MESSAGES =
- "giraph.useOutOfCoreMessages";
- /** Default choice about using out-of-core messaging */
- public static final boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false;
- /**
- * If using out-of-core messaging, it tells how much messages do we keep
- * in memory.
- */
- public static final String MAX_MESSAGES_IN_MEMORY =
- "giraph.maxMessagesInMemory";
- /** Default maximum number of messages in memory. */
- public static final int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000;
- /** Size of buffer when reading and writing messages out-of-core. */
- public static final String MESSAGES_BUFFER_SIZE =
- "giraph.messagesBufferSize";
- /** Default size of buffer when reading and writing messages out-of-core. */
- public static final int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
-
- /** Directory in the local filesystem for out-of-core partitions. */
- public static final String PARTITIONS_DIRECTORY =
- "giraph.partitionsDirectory";
- /** Default directory for out-of-core partitions. */
- public static final String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
-
- /** Enable out-of-core graph. */
- public static final String USE_OUT_OF_CORE_GRAPH =
- "giraph.useOutOfCoreGraph";
- /** Default is not to use out-of-core graph. */
- public static final boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
-
- /** Maximum number of partitions to hold in memory for each worker. */
- public static final String MAX_PARTITIONS_IN_MEMORY =
- "giraph.maxPartitionsInMemory";
- /** Default maximum number of in-memory partitions. */
- public static final int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
-
- /** Keep the zookeeper output for debugging? Default is to remove it. */
- public static final String KEEP_ZOOKEEPER_DATA =
- "giraph.keepZooKeeperData";
- /** Default is to remove ZooKeeper data. */
- public static final Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
-
- /** Default ZooKeeper tick time. */
- public static final int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
- /** Default ZooKeeper init limit (in ticks). */
- public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
- /** Default ZooKeeper sync limit (in ticks). */
- public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
- /** Default ZooKeeper snap count. */
- public static final int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
- /** Default ZooKeeper maximum client connections. */
- public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
- /** ZooKeeper minimum session timeout */
- public static final String ZOOKEEPER_MIN_SESSION_TIMEOUT =
- "giraph.zookeeperMinSessionTimeout";
- /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
- public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300 * 1000;
- /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
- public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000;
-
/** Class logger */
private static final Logger LOG = Logger.getLogger(GiraphJob.class);
-
- /** Internal job that actually is submitted */
- private final Job job;
+ /** Internal delegated job to proxy interface requests for Job */
+ private final DelegatedJob delegatedJob;
+ /** Name of the job */
+ private final String jobName;
/** Helper configuration from the job */
- private final Configuration conf;
+ private final GiraphConfiguration giraphConfiguration;
+ /**
+ * Delegated job that simply passes along the class GiraphConfiguration.
+ */
+ private class DelegatedJob extends Job {
+ /** Ensure that for job initiation the super.getConfiguration() is used */
+ private boolean jobInited = false;
+
+ /**
+ * Constructor
+ *
+ * @throws IOException
+ */
+ DelegatedJob() throws IOException { }
+
+ @Override
+ public Configuration getConfiguration() {
+ if (jobInited) {
+ return giraphConfiguration;
+ } else {
+ return super.getConfiguration();
+ }
+ }
+ }
/**
* Constructor that will instantiate the configuration
@@ -499,19 +79,33 @@ public class GiraphJob {
* @throws IOException
*/
public GiraphJob(String jobName) throws IOException {
- this(new Configuration(), jobName);
+ this(new GiraphConfiguration(), jobName);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param configuration User-defined configuration
+ * @param jobName User-defined job name
+ * @throws IOException
+ */
+ public GiraphJob(Configuration configuration,
+ String jobName) throws IOException {
+ this(new GiraphConfiguration(configuration), jobName);
}
/**
* Constructor.
*
- * @param conf User-defined configuration
+ * @param giraphConfiguration User-defined configuration
* @param jobName User-defined job name
* @throws IOException
*/
- public GiraphJob(Configuration conf, String jobName) throws IOException {
- job = new Job(conf, jobName);
- this.conf = job.getConfiguration();
+ public GiraphJob(GiraphConfiguration giraphConfiguration,
+ String jobName) throws IOException {
+ this.jobName = jobName;
+ this.giraphConfiguration = giraphConfiguration;
+ this.delegatedJob = new DelegatedJob();
}
/**
@@ -519,8 +113,8 @@ public class GiraphJob {
*
* @return Configuration used by the job.
*/
- public Configuration getConfiguration() {
- return conf;
+ public GiraphConfiguration getConfiguration() {
+ return giraphConfiguration;
}
/**
@@ -531,176 +125,47 @@ public class GiraphJob {
* @return Internal job that will actually be submitted to Hadoop.
*/
public Job getInternalJob() {
- return job;
+ delegatedJob.jobInited = true;
+ return delegatedJob;
}
/**
* Make sure the configuration is set properly by the user prior to
* submitting the job.
+ *
+ * @param conf Configuration to check
*/
- private void checkConfiguration() {
- if (conf.getInt(MAX_WORKERS, -1) < 0) {
- throw new RuntimeException("No valid " + MAX_WORKERS);
- }
- if (conf.getFloat(MIN_PERCENT_RESPONDED,
- MIN_PERCENT_RESPONDED_DEFAULT) <= 0.0f ||
- conf.getFloat(MIN_PERCENT_RESPONDED,
- MIN_PERCENT_RESPONDED_DEFAULT) > 100.0f) {
- throw new IllegalArgumentException(
- "Invalid " +
- conf.getFloat(MIN_PERCENT_RESPONDED,
- MIN_PERCENT_RESPONDED_DEFAULT) + " for " +
- MIN_PERCENT_RESPONDED);
- }
- if (conf.getInt(MIN_WORKERS, -1) < 0) {
- throw new IllegalArgumentException("No valid " + MIN_WORKERS);
+ private void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
+ if (conf.getMaxWorkers() < 0) {
+ throw new RuntimeException("checkConfiguration: No valid " +
+ GiraphConfiguration.MAX_WORKERS);
}
- if (BspUtils.getVertexClass(getConfiguration()) == null) {
- throw new IllegalArgumentException("GiraphJob: Null VERTEX_CLASS");
- }
- if (BspUtils.getVertexInputFormatClass(getConfiguration()) == null) {
+ if (conf.getMinPercentResponded() <= 0.0f ||
+ conf.getMinPercentResponded() > 100.0f) {
throw new IllegalArgumentException(
- "GiraphJob: Null VERTEX_INPUT_FORMAT_CLASS");
+ "checkConfiguration: Invalid " + conf.getMinPercentResponded() +
+ " for " + GiraphConfiguration.MIN_PERCENT_RESPONDED);
+ }
+ if (conf.getMinWorkers() < 0) {
+ throw new IllegalArgumentException("checkConfiguration: No valid " +
+ GiraphConfiguration.MIN_WORKERS);
+ }
+ if (conf.getVertexClass() == null) {
+ throw new IllegalArgumentException("checkConfiguration: Null" +
+ GiraphConfiguration.VERTEX_CLASS);
+ }
+ if (conf.getVertexInputFormatClass() == null) {
+ throw new IllegalArgumentException("checkConfiguration: Null " +
+ GiraphConfiguration.VERTEX_INPUT_FORMAT_CLASS);
}
- if (BspUtils.getVertexResolverClass(getConfiguration()) == null) {
- setVertexResolverClass(VertexResolver.class);
+ if (conf.getVertexResolverClass() == null) {
if (LOG.isInfoEnabled()) {
- LOG.info("GiraphJob: No class found for " +
- VERTEX_RESOLVER_CLASS + ", defaulting to " +
+ LOG.info("checkConfiguration: No class found for " +
+ GiraphConfiguration.VERTEX_RESOLVER_CLASS + ", defaulting to " +
VertexResolver.class.getCanonicalName());
}
}
}
- /**
- * Set the vertex class (required)
- *
- * @param vertexClass Runs vertex computation
- */
- public final void setVertexClass(Class<?> vertexClass) {
- getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class);
- }
-
- /**
- * Set the vertex input format class (required)
- *
- * @param vertexInputFormatClass Determines how graph is input
- */
- public final void setVertexInputFormatClass(
- Class<?> vertexInputFormatClass) {
- getConfiguration().setClass(VERTEX_INPUT_FORMAT_CLASS,
- vertexInputFormatClass,
- VertexInputFormat.class);
- }
-
- /**
- * Set the master class (optional)
- *
- * @param masterComputeClass Runs master computation
- */
- public final void setMasterComputeClass(Class<?> masterComputeClass) {
- getConfiguration().setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
- MasterCompute.class);
- }
-
- /**
- * Set the vertex output format class (optional)
- *
- * @param vertexOutputFormatClass Determines how graph is output
- */
- public final void setVertexOutputFormatClass(
- Class<?> vertexOutputFormatClass) {
- getConfiguration().setClass(VERTEX_OUTPUT_FORMAT_CLASS,
- vertexOutputFormatClass,
- VertexOutputFormat.class);
- }
-
- /**
- * Set the vertex combiner class (optional)
- *
- * @param vertexCombinerClass Determines how vertex messages are combined
- */
- public final void setVertexCombinerClass(Class<?> vertexCombinerClass) {
- getConfiguration().setClass(VERTEX_COMBINER_CLASS,
- vertexCombinerClass,
- VertexCombiner.class);
- }
-
- /**
- * Set the graph partitioner class (optional)
- *
- * @param graphPartitionerFactoryClass Determines how the graph is partitioned
- */
- public final void setGraphPartitionerFactoryClass(
- Class<?> graphPartitionerFactoryClass) {
- getConfiguration().setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
- graphPartitionerFactoryClass,
- GraphPartitionerFactory.class);
- }
-
- /**
- * Set the vertex resolver class (optional)
- *
- * @param vertexResolverClass Determines how vertex mutations are resolved
- */
- public final void setVertexResolverClass(Class<?> vertexResolverClass) {
- getConfiguration().setClass(VERTEX_RESOLVER_CLASS,
- vertexResolverClass,
- VertexResolver.class);
- }
-
- /**
- * Set the worker context class (optional)
- *
- * @param workerContextClass Determines what code is executed on a each
- * worker before and after each superstep and computation
- */
- public final void setWorkerContextClass(Class<?> workerContextClass) {
- getConfiguration().setClass(WORKER_CONTEXT_CLASS,
- workerContextClass,
- WorkerContext.class);
- }
-
- /**
- * Set the aggregator writer class (optional)
- *
- * @param aggregatorWriterClass Determines how the aggregators are
- * written to file at the end of the job
- */
- public final void setAggregatorWriterClass(
- Class<?> aggregatorWriterClass) {
- getConfiguration().setClass(AGGREGATOR_WRITER_CLASS,
- aggregatorWriterClass,
- AggregatorWriter.class);
- }
-
- /**
- * Set worker configuration for determining what is required for
- * a superstep.
- *
- * @param minWorkers Minimum workers to do a superstep
- * @param maxWorkers Maximum workers to do a superstep
- * (max map tasks in job)
- * @param minPercentResponded 0 - 100 % of the workers required to
- * have responded before continuing the superstep
- */
- public final void setWorkerConfiguration(int minWorkers,
- int maxWorkers,
- float minPercentResponded) {
- conf.setInt(MIN_WORKERS, minWorkers);
- conf.setInt(MAX_WORKERS, maxWorkers);
- conf.setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
- }
-
- /**
- * Utilize an existing ZooKeeper service. If this is not set, ZooKeeper
- * will be dynamically started by Giraph for this job.
- *
- * @param serverList Comma separated list of servers and ports
- * (i.e. zk1:2221,zk2:2221)
- */
- public final void setZooKeeperConfiguration(String serverList) {
- conf.set(ZOOKEEPER_LIST, serverList);
- }
/**
* Check if the configuration is local. If it is local, do additional
@@ -709,22 +174,21 @@ public class GiraphJob {
* @param conf Configuration
*/
private static void checkLocalJobRunnerConfiguration(
- Configuration conf) {
+ GiraphConfiguration conf) {
String jobTracker = conf.get("mapred.job.tracker", null);
if (!jobTracker.equals("local")) {
// Nothing to check
return;
}
- int maxWorkers = conf.getInt(MAX_WORKERS, -1);
+ int maxWorkers = conf.getMaxWorkers();
if (maxWorkers != 1) {
throw new IllegalArgumentException(
"checkLocalJobRunnerConfiguration: When using " +
"LocalJobRunner, must have only one worker since " +
"only 1 task at a time!");
}
- if (conf.getBoolean(SPLIT_MASTER_WORKER,
- SPLIT_MASTER_WORKER_DEFAULT)) {
+ if (conf.getSplitMasterWorker()) {
throw new IllegalArgumentException(
"checkLocalJobRunnerConfiguration: When using " +
"LocalJobRunner, you cannot run in split master / worker " +
@@ -739,8 +203,9 @@ public class GiraphJob {
* @param defaultValue Assign to value if not set
*/
private void setIntConfIfDefault(String param, int defaultValue) {
- if (conf.getInt(param, Integer.MIN_VALUE) == Integer.MIN_VALUE) {
- conf.setInt(param, defaultValue);
+ if (giraphConfiguration.getInt(param, Integer.MIN_VALUE) ==
+ Integer.MIN_VALUE) {
+ giraphConfiguration.setInt(param, defaultValue);
}
}
@@ -755,9 +220,6 @@ public class GiraphJob {
*/
public final boolean run(boolean verbose)
throws IOException, InterruptedException, ClassNotFoundException {
- checkConfiguration();
- checkLocalJobRunnerConfiguration(conf);
- job.setNumReduceTasks(0);
// Most users won't hit this hopefully and can set it higher if desired
setIntConfIfDefault("mapreduce.job.counters.limit", 512);
@@ -767,22 +229,30 @@ public class GiraphJob {
setIntConfIfDefault("mapred.job.reduce.memory.mb", 1024);
// Speculative execution doesn't make sense for Giraph
- conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+ giraphConfiguration.setBoolean(
+ "mapred.map.tasks.speculative.execution", false);
// Set the ping interval to 5 minutes instead of one minute
// (DEFAULT_PING_INTERVAL)
- Client.setPingInterval(conf, 60000 * 5);
+ Client.setPingInterval(giraphConfiguration, 60000 * 5);
- if (job.getJar() == null) {
- job.setJarByClass(GiraphJob.class);
- }
// Should work in MAPREDUCE-1938 to let the user jars/classes
// get loaded first
- conf.setBoolean("mapreduce.user.classpath.first", true);
+ giraphConfiguration.setBoolean("mapreduce.user.classpath.first", true);
- job.setMapperClass(GraphMapper.class);
- job.setInputFormatClass(BspInputFormat.class);
- job.setOutputFormatClass(BspOutputFormat.class);
- return job.waitForCompletion(verbose);
+ // Set the job properties, check them, and submit the job
+ ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration =
+ new ImmutableClassesGiraphConfiguration(giraphConfiguration);
+ checkConfiguration(immutableClassesGiraphConfiguration);
+ checkLocalJobRunnerConfiguration(immutableClassesGiraphConfiguration);
+ Job submittedJob = new Job(immutableClassesGiraphConfiguration, jobName);
+ if (submittedJob.getJar() == null) {
+ submittedJob.setJarByClass(GiraphJob.class);
+ }
+ submittedJob.setNumReduceTasks(0);
+ submittedJob.setMapperClass(GraphMapper.class);
+ submittedJob.setInputFormatClass(BspInputFormat.class);
+ submittedJob.setOutputFormatClass(BspOutputFormat.class);
+ return submittedJob.waitForCompletion(verbose);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,8 @@
package org.apache.giraph.graph;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.MessageStoreByPartition;
@@ -75,7 +77,7 @@ public class GraphMapper<I extends Writa
/** Manages the ZooKeeper servers if necessary (dynamic startup) */
private ZooKeeperManager zkManager;
/** Configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Already complete? */
private boolean done = false;
/** What kind of functions is this mapper doing? */
@@ -157,25 +159,25 @@ public class GraphMapper<I extends Writa
*/
public void determineClassTypes(Configuration conf) {
Class<? extends Vertex<I, V, E, M>> vertexClass =
- BspUtils.<I, V, E, M>getVertexClass(conf);
+ BspUtils.<I, V, E, M>getVertexClass(conf);
List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
- Vertex.class, vertexClass);
+ Vertex.class, vertexClass);
Type vertexIndexType = classList.get(0);
Type vertexValueType = classList.get(1);
Type edgeValueType = classList.get(2);
Type messageValueType = classList.get(3);
- conf.setClass(GiraphJob.VERTEX_ID_CLASS,
- (Class<?>) vertexIndexType,
- WritableComparable.class);
- conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
- (Class<?>) vertexValueType,
- Writable.class);
- conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
- (Class<?>) edgeValueType,
- Writable.class);
- conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
- (Class<?>) messageValueType,
- Writable.class);
+ conf.setClass(GiraphConfiguration.VERTEX_ID_CLASS,
+ (Class<?>) vertexIndexType,
+ WritableComparable.class);
+ conf.setClass(GiraphConfiguration.VERTEX_VALUE_CLASS,
+ (Class<?>) vertexValueType,
+ Writable.class);
+ conf.setClass(GiraphConfiguration.EDGE_VALUE_CLASS,
+ (Class<?>) edgeValueType,
+ Writable.class);
+ conf.setClass(GiraphConfiguration.MESSAGE_VALUE_CLASS,
+ (Class<?>) messageValueType,
+ Writable.class);
}
/**
@@ -223,14 +225,11 @@ public class GraphMapper<I extends Writa
* @return Functions that this mapper should do.
*/
private static MapFunctions determineMapFunctions(
- Configuration conf,
+ ImmutableClassesGiraphConfiguration conf,
ZooKeeperManager zkManager) {
- boolean splitMasterWorker =
- conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
- GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
- int taskPartition = conf.getInt("mapred.task.partition", -1);
- boolean zkAlreadyProvided =
- conf.get(GiraphJob.ZOOKEEPER_LIST) != null;
+ boolean splitMasterWorker = conf.getSplitMasterWorker();
+ int taskPartition = conf.getTaskPartition();
+ boolean zkAlreadyProvided = conf.getZookeeperList() != null;
MapFunctions functions = MapFunctions.UNKNOWN;
// What functions should this mapper do?
if (!splitMasterWorker) {
@@ -241,9 +240,7 @@ public class GraphMapper<I extends Writa
}
} else {
if (zkAlreadyProvided) {
- int masterCount =
- conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
- GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ int masterCount = conf.getZooKeeperServerCount();
if (taskPartition < masterCount) {
functions = MapFunctions.MASTER_ONLY;
} else {
@@ -268,18 +265,17 @@ public class GraphMapper<I extends Writa
// Setting the default handler for uncaught exceptions.
Thread.setDefaultUncaughtExceptionHandler(
new OverrideExceptionHandler());
- conf = context.getConfiguration();
+ determineClassTypes(context.getConfiguration());
+ conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ context.getConfiguration());
// Hadoop security needs this property to be set
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
conf.set("mapreduce.job.credentials.binary",
System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}
- // set pre-validated generic parameter types into Configuration
- determineClassTypes(conf);
// Set the log level
- String logLevel =
- conf.get(GiraphJob.LOG_LEVEL, GiraphJob.LOG_LEVEL_DEFAULT);
+ String logLevel = conf.getLocalLevel();
Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
if (LOG.isInfoEnabled()) {
LOG.info("setup: Set log level to " + logLevel);
@@ -287,8 +283,7 @@ public class GraphMapper<I extends Writa
// Do some initial setup (possibly starting up a Zookeeper service)
context.setStatus("setup: Initializing Zookeeper services.");
- if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
- GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
+ if (!conf.getLocalTestMode()) {
Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
String zkClasspath = null;
if (fileClassPaths == null) {
@@ -314,11 +309,11 @@ public class GraphMapper<I extends Writa
if (LOG.isInfoEnabled()) {
LOG.info("setup: classpath @ " + zkClasspath);
}
- conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
+ context.getConfiguration().set(
+ GiraphConfiguration.ZOOKEEPER_JAR, zkClasspath);
}
- String serverPortList =
- conf.get(GiraphJob.ZOOKEEPER_LIST, "");
- if (serverPortList.isEmpty()) {
+ String serverPortList = conf.getZookeeperList();
+ if (serverPortList == null) {
zkManager = new ZooKeeperManager(context);
context.setStatus("setup: Setting up Zookeeper manager.");
zkManager.setup();
@@ -334,14 +329,11 @@ public class GraphMapper<I extends Writa
this.mapFunctions = determineMapFunctions(conf, zkManager);
// Sometimes it takes a while to get multiple ZooKeeper servers up
- if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
- GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) {
- Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT *
- GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME);
- }
- int sessionMsecTimeout =
- conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT,
- GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ if (conf.getZooKeeperServerCount() > 1) {
+ Thread.sleep(GiraphConfiguration.DEFAULT_ZOOKEEPER_INIT_LIMIT *
+ GiraphConfiguration.DEFAULT_ZOOKEEPER_TICK_TIME);
+ }
+ int sessionMsecTimeout = conf.getZooKeeperSessionTimeout();
try {
if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
(mapFunctions == MapFunctions.MASTER_ONLY) ||
@@ -475,8 +467,7 @@ public class GraphMapper<I extends Writa
serviceWorker.getWorkerContext().preSuperstep();
context.progress();
- boolean useNetty = conf.getBoolean(GiraphJob.USE_NETTY,
- GiraphJob.USE_NETTY_DEFAULT);
+ boolean useNetty = conf.getUseNetty();
MessageStoreByPartition<I, M> messageStore = null;
if (useNetty) {
messageStore = serviceWorker.getServerData().getCurrentMessageStore();
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java Tue Sep 25 17:40:18 2012
@@ -146,18 +146,18 @@ public abstract class HashMapVertex<I ex
@Override
public final void readFields(DataInput in) throws IOException {
- I vertexId = BspUtils.<I>createVertexId(getConf());
+ I vertexId = getConf().createVertexId();
vertexId.readFields(in);
- V vertexValue = BspUtils.<V>createVertexValue(getConf());
+ V vertexValue = getConf().createVertexValue();
vertexValue.readFields(in);
super.initialize(vertexId, vertexValue);
int numEdges = in.readInt();
edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = BspUtils.<I>createVertexId(getConf());
+ I targetVertexId = getConf().createVertexId();
targetVertexId.readFields(in);
- E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+ E edgeValue = getConf().createEdgeValue();
edgeValue.readFields(in);
edgeMap.put(targetVertexId, edgeValue);
}
@@ -165,7 +165,7 @@ public abstract class HashMapVertex<I ex
int numMessages = in.readInt();
messageList = Lists.newArrayListWithCapacity(numMessages);
for (int i = 0; i < numMessages; ++i) {
- M message = BspUtils.<M>createMessageValue(getConf());
+ M message = getConf().createMessageValue();
message.readFields(in);
messageList.add(message);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
package org.apache.giraph.graph;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -41,13 +41,13 @@ import org.apache.hadoop.mapreduce.Mappe
*/
@SuppressWarnings("rawtypes")
public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
- Configurable {
+ ImmutableClassesGiraphConfigurable {
/** If true, do not do anymore computation on this vertex. */
private boolean halt = false;
/** Global graph state **/
private GraphState graphState;
/** Configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration conf;
/**
* Must be defined by user to specify what the master has to do.
@@ -164,12 +164,12 @@ public abstract class MasterCompute impl
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java Tue Sep 25 17:40:18 2012
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.SuperstepState;
@@ -74,8 +75,8 @@ public class MasterThread<I extends Writ
this.bspServiceMaster = bspServiceMaster;
this.context = context;
superstepCounterOn = context.getConfiguration().getBoolean(
- GiraphJob.USE_SUPERSTEP_COUNTERS,
- GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
+ GiraphConfiguration.USE_SUPERSTEP_COUNTERS,
+ GiraphConfiguration.USE_SUPERSTEP_COUNTERS_DEFAULT);
}
/**
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Tue Sep 25 17:40:18 2012
@@ -68,8 +68,7 @@ public abstract class MutableVertex<I ex
public Vertex<I, V, E, M> instantiateVertex(
I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
MutableVertex<I, V, E, M> mutableVertex =
- (MutableVertex<I, V, E, M>) BspUtils
- .<I, V, E, M>createVertex(getContext().getConfiguration());
+ (MutableVertex<I, V, E, M>) getConf().createVertex();
mutableVertex.setGraphState(getGraphState());
mutableVertex.initialize(vertexId, vertexValue, edges, messages);
return mutableVertex;
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java Tue Sep 25 17:40:18 2012
@@ -92,15 +92,15 @@ public abstract class SimpleMutableVerte
@Override
public void readFields(DataInput in) throws IOException {
- I vertexId = BspUtils.<I>createVertexId(getConf());
+ I vertexId = (I) getConf().createVertexId();
vertexId.readFields(in);
- V vertexValue = BspUtils.<V>createVertexValue(getConf());
+ V vertexValue = (V) getConf().createVertexValue();
vertexValue.readFields(in);
int numEdges = in.readInt();
Map<I, NullWritable> edges = new HashMap<I, NullWritable>(numEdges);
for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = BspUtils.<I>createVertexId(getConf());
+ I targetVertexId = (I) getConf().createVertexId();
targetVertexId.readFields(in);
edges.put(targetVertexId, NullWritable.get());
}
@@ -108,7 +108,7 @@ public abstract class SimpleMutableVerte
int numMessages = in.readInt();
List<M> messages = new ArrayList<M>(numMessages);
for (int i = 0; i < numMessages; ++i) {
- M message = BspUtils.<M>createMessageValue(getConf());
+ M message = (M) getConf().createMessageValue();
message.readFields(in);
messages.add(message);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java Tue Sep 25 17:40:18 2012
@@ -71,15 +71,15 @@ public abstract class SimpleVertex<I ext
@Override
public void readFields(DataInput in) throws IOException {
- I vertexId = BspUtils.<I>createVertexId(getConf());
+ I vertexId = getConf().createVertexId();
vertexId.readFields(in);
- V vertexValue = BspUtils.<V>createVertexValue(getConf());
+ V vertexValue = getConf().createVertexValue();
vertexValue.readFields(in);
int numEdges = in.readInt();
Map<I, NullWritable> edges = new HashMap<I, NullWritable>(numEdges);
for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = BspUtils.<I>createVertexId(getConf());
+ I targetVertexId = getConf().createVertexId();
targetVertexId.readFields(in);
edges.put(targetVertexId, NullWritable.get());
}
@@ -87,7 +87,7 @@ public abstract class SimpleVertex<I ext
int numMessages = in.readInt();
List<M> messages = new ArrayList<M>(numMessages);
for (int i = 0; i < numMessages; ++i) {
- M message = BspUtils.<M>createMessageValue(getConf());
+ M message = getConf().createMessageValue();
message.readFields(in);
messages.add(message);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
package org.apache.giraph.graph;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -45,7 +45,8 @@ import java.util.Map;
@SuppressWarnings("rawtypes")
public abstract class Vertex<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements WorkerAggregatorUsage, Writable, Configurable {
+ implements WorkerAggregatorUsage, Writable,
+ ImmutableClassesGiraphConfigurable<I, V, E, M> {
/** Vertex id. */
private I id;
/** Vertex value. */
@@ -55,12 +56,14 @@ public abstract class Vertex<I extends W
/** Global graph state **/
private GraphState<I, V, E, M> graphState;
/** Configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/**
- * This method must be called after instantiation of a vertex with BspUtils
- * unless deserialization from readFields() is called.
+ * This method must be called after instantiation of a vertex
+ * with ImmutableClassesGiraphConfiguration
+ * unless deserialization from readFields() is
+ * called.
*
* @param id Will be the vertex id
* @param value Will be the vertex value
@@ -328,17 +331,17 @@ public abstract class Vertex<I extends W
@Override
public void readFields(DataInput in) throws IOException {
- I vertexId = BspUtils.<I>createVertexId(getConf());
+ I vertexId = (I) getConf().createVertexId();
vertexId.readFields(in);
- V vertexValue = BspUtils.<V>createVertexValue(getConf());
+ V vertexValue = (V) getConf().createVertexValue();
vertexValue.readFields(in);
int numEdges = in.readInt();
Map<I, E> edges = new HashMap<I, E>(numEdges);
for (int i = 0; i < numEdges; ++i) {
- I targetVertexId = BspUtils.<I>createVertexId(getConf());
+ I targetVertexId = (I) getConf().createVertexId();
targetVertexId.readFields(in);
- E edgeValue = BspUtils.<E>createEdgeValue(getConf());
+ E edgeValue = (E) getConf().createEdgeValue();
edgeValue.readFields(in);
edges.put(targetVertexId, edgeValue);
}
@@ -346,7 +349,7 @@ public abstract class Vertex<I extends W
int numMessages = in.readInt();
List<M> messages = new ArrayList<M>(numMessages);
for (int i = 0; i < numMessages; ++i) {
- M message = BspUtils.<M>createMessageValue(getConf());
+ M message = (M) getConf().createMessageValue();
message.readFields(in);
messages.add(message);
}
@@ -375,12 +378,12 @@ public abstract class Vertex<I extends W
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
this.conf = conf;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java Tue Sep 25 17:40:18 2012
@@ -24,8 +24,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.json.JSONException;
@@ -44,7 +44,7 @@ import org.json.JSONObject;
public class VertexMutations<I extends WritableComparable,
V extends Writable, E extends Writable,
M extends Writable> implements VertexChanges<I, V, E, M>,
- Writable, Configurable {
+ Writable, ImmutableClassesGiraphConfigurable {
/** List of added vertices during the last superstep */
private final List<Vertex<I, V, E, M>> addedVertexList =
new ArrayList<Vertex<I, V, E, M>>();
@@ -55,7 +55,7 @@ public class VertexMutations<I extends W
/** List of removed edges */
private final List<I> removedEdgeList = new ArrayList<I>();
/** Configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/**
* Copy the vertex mutations.
@@ -85,22 +85,22 @@ public class VertexMutations<I extends W
int addedVertexListSize = input.readInt();
for (int i = 0; i < addedVertexListSize; ++i) {
- Vertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+ Vertex<I, V, E, M> vertex = conf.createVertex();
vertex.readFields(input);
addedVertexList.add(vertex);
}
removedVertexCount = input.readInt();
int addedEdgeListSize = input.readInt();
for (int i = 0; i < addedEdgeListSize; ++i) {
- I destVertex = BspUtils.<I>createVertexId(conf);
+ I destVertex = conf.createVertexId();
destVertex.readFields(input);
- E edgeValue = BspUtils.<E>createEdgeValue(conf);
+ E edgeValue = conf.createEdgeValue();
edgeValue.readFields(input);
addedEdgeList.add(new Edge<I, E>(destVertex, edgeValue));
}
int removedEdgeListSize = input.readInt();
for (int i = 0; i < removedEdgeListSize; ++i) {
- I removedEdge = BspUtils.<I>createVertexId(conf);
+ I removedEdge = conf.createVertexId();
removedEdge.readFields(input);
removedEdgeList.add(removedEdge);
}
@@ -201,12 +201,12 @@ public class VertexMutations<I extends W
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
package org.apache.giraph.graph;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -38,11 +38,12 @@ import java.util.List;
@SuppressWarnings("rawtypes")
public class VertexResolver<I extends WritableComparable, V extends Writable,
E extends Writable, M extends Writable>
- implements BasicVertexResolver<I, V, E, M>, Configurable {
+ implements BasicVertexResolver<I, V, E, M>,
+ ImmutableClassesGiraphConfigurable<I, V, E, M> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(VertexResolver.class);
/** Configuration */
- private Configuration conf = null;
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf = null;
/** Stored graph state */
private GraphState<I, V, E, M> graphState;
@@ -86,7 +87,7 @@ public class VertexResolver<I extends Wr
if (vertex == null && hasMessages) {
vertex = instantiateVertex();
vertex.initialize(vertexId,
- BspUtils.<V>createVertexValue(getConf()),
+ getConf().createVertexValue(),
null,
null);
}
@@ -114,19 +115,18 @@ public class VertexResolver<I extends Wr
@Override
public Vertex<I, V, E, M> instantiateVertex() {
- Vertex<I, V, E, M> vertex =
- BspUtils.<I, V, E, M>createVertex(getConf());
+ Vertex<I, V, E, M> vertex = getConf().createVertex();
vertex.setGraphState(graphState);
return vertex;
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
this.conf = conf;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/BasicPartitionOwner.java Tue Sep 25 17:40:18 2012
@@ -22,17 +22,18 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
/**
* Basic partition owner, can be subclassed for more complicated partition
* owner implementations.
*/
-public class BasicPartitionOwner implements PartitionOwner, Configurable {
+public class BasicPartitionOwner implements PartitionOwner,
+ ImmutableClassesGiraphConfigurable {
/** Configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration conf;
/** Partition id */
private int partitionId = -1;
/** Owning worker information */
@@ -145,12 +146,12 @@ public class BasicPartitionOwner impleme
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/DiskBackedPartitionStore.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,9 @@
package org.apache.giraph.graph.partition;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -68,7 +67,7 @@ public class DiskBackedPartitionStore<I
/** Directory on the local file system for storing out-of-core partitions. */
private final String basePath;
/** Configuration. */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Slot for loading out-of-core partitions. */
private Partition<I, V, E, M> loadedPartition;
/** Locks for accessing and modifying partitions. */
@@ -80,15 +79,16 @@ public class DiskBackedPartitionStore<I
*
* @param conf Configuration
*/
- public DiskBackedPartitionStore(Configuration conf) {
+ public DiskBackedPartitionStore(
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
this.conf = conf;
// We must be able to hold at least one partition in memory
maxInMemoryPartitions = Math.max(1,
- conf.getInt(GiraphJob.MAX_PARTITIONS_IN_MEMORY,
- GiraphJob.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
+ conf.getInt(GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY,
+ GiraphConfiguration.MAX_PARTITIONS_IN_MEMORY_DEFAULT));
basePath = conf.get("mapred.job.id", "Unknown Job") +
- conf.get(GiraphJob.PARTITIONS_DIRECTORY,
- GiraphJob.PARTITIONS_DIRECTORY_DEFAULT);
+ conf.get(GiraphConfiguration.PARTITIONS_DIRECTORY,
+ GiraphConfiguration.PARTITIONS_DIRECTORY_DEFAULT);
}
/**
@@ -162,7 +162,7 @@ public class DiskBackedPartitionStore<I
new BufferedInputStream(new FileInputStream(file)));
int numVertices = onDiskPartitions.get(partitionId);
for (int i = 0; i < numVertices; ++i) {
- Vertex<I, V, E, M> vertex = BspUtils.<I, V, E, M>createVertex(conf);
+ Vertex<I, V, E, M> vertex = conf.createVertex();
vertex.readFields(inputStream);
partition.putVertex(vertex);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/GraphPartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.graph.partition;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -31,7 +32,8 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public interface GraphPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable, M extends Writable> extends
+ ImmutableClassesGiraphConfigurable {
/**
* Create the {@link MasterGraphPartitioner} used by the master.
* Instantiated once by the master and reused.
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashMasterPartitioner.java Tue Sep 25 17:40:18 2012
@@ -23,8 +23,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -59,7 +59,7 @@ public class HashMasterPartitioner<I ext
*/
private static final int MAX_PARTTIONS = 1024 * 1024 / 350;
/** Provided configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration conf;
/** Specified partition count (overrides calculation) */
private final int userPartitionCount;
/** Partition count (calculated in createInitialPartitionOwners) */
@@ -72,7 +72,7 @@ public class HashMasterPartitioner<I ext
*
*@param conf Configuration used.
*/
- public HashMasterPartitioner(Configuration conf) {
+ public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
userPartitionCount = conf.getInt(USER_PARTITION_COUNT,
DEFAULT_USER_PARTITION_COUNT);
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,7 @@
package org.apache.giraph.graph.partition;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -35,9 +34,9 @@ import org.apache.hadoop.io.WritableComp
@SuppressWarnings("rawtypes")
public class HashPartitionerFactory<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+ implements GraphPartitionerFactory<I, V, E, M> {
/** Saved configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration conf;
@Override
public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
@@ -50,12 +49,12 @@ public class HashPartitionerFactory<I ex
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,7 @@
package org.apache.giraph.graph.partition;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -35,9 +34,9 @@ import org.apache.hadoop.io.WritableComp
@SuppressWarnings("rawtypes")
public class HashRangePartitionerFactory<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements Configurable, GraphPartitionerFactory<I, V, E, M> {
+ implements GraphPartitionerFactory<I, V, E, M> {
/** Saved configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
@Override
public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
@@ -50,12 +49,12 @@ public class HashRangePartitionerFactory
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Tue Sep 25 17:40:18 2012
@@ -18,10 +18,9 @@
package org.apache.giraph.graph.partition;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -48,7 +47,7 @@ public class Partition<I extends Writabl
V extends Writable, E extends Writable, M extends Writable>
implements Writable {
/** Configuration from the worker */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/** Partition id */
private final int id;
/** Vertex map for this range (keyed by index) */
@@ -60,11 +59,12 @@ public class Partition<I extends Writabl
* @param conf Configuration.
* @param id Partition id.
*/
- public Partition(Configuration conf, int id) {
+ public Partition(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
+ int id) {
this.conf = conf;
this.id = id;
- if (conf.getBoolean(GiraphJob.USE_OUT_OF_CORE_MESSAGES,
- GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+ if (conf.getBoolean(GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
+ GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
vertexMap = Maps.newConcurrentMap();
@@ -153,8 +153,7 @@ public class Partition<I extends Writabl
public void readFields(DataInput input) throws IOException {
int vertices = input.readInt();
for (int i = 0; i < vertices; ++i) {
- Vertex<I, V, E, M> vertex =
- BspUtils.<I, V, E, M>createVertex(conf);
+ Vertex<I, V, E, M> vertex = conf.createVertex();
vertex.readFields(input);
if (vertexMap.put(vertex.getId(),
(Vertex<I, V, E, M>) vertex) != null) {
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java Tue Sep 25 17:40:18 2012
@@ -22,7 +22,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.giraph.graph.BspUtils;
import org.apache.hadoop.io.WritableComparable;
/**
@@ -63,7 +62,7 @@ public class RangePartitionOwner<I exten
@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
- maxIndex = BspUtils.<I>createVertexId(getConf());
+ maxIndex = (I) getConf().createVertexId();
maxIndex.readFields(input);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java Tue Sep 25 17:40:18 2012
@@ -22,9 +22,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -36,7 +35,7 @@ import org.apache.hadoop.io.WritableComp
*/
@SuppressWarnings("rawtypes")
public class RangeSplitHint<I extends WritableComparable>
- implements Writable, Configurable {
+ implements Writable, ImmutableClassesGiraphConfigurable {
/** Hinted split index */
private I splitIndex;
/** Number of vertices in this range before the split */
@@ -44,11 +43,11 @@ public class RangeSplitHint<I extends Wr
/** Number of vertices in this range after the split */
private long postSplitVertexCount;
/** Configuration */
- private Configuration conf;
+ private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> conf;
@Override
public void readFields(DataInput input) throws IOException {
- splitIndex = BspUtils.<I>createVertexId(conf);
+ splitIndex = conf.createVertexId();
splitIndex.readFields(input);
preSplitVertexCount = input.readLong();
postSplitVertexCount = input.readLong();
@@ -62,12 +61,12 @@ public class RangeSplitHint<I extends Wr
}
@Override
- public Configuration getConf() {
+ public ImmutableClassesGiraphConfiguration getConf() {
return conf;
}
@Override
- public void setConf(Configuration conf) {
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
this.conf = conf;
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,8 @@
package org.apache.giraph.graph.partition;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -43,14 +43,15 @@ public class SimplePartitionStore<I exte
private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
Maps.newConcurrentMap();
/** Configuration. */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
/**
* Constructor.
*
* @param conf Configuration
*/
- public SimplePartitionStore(Configuration conf) {
+ public SimplePartitionStore(
+ ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
this.conf = conf;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java Tue Sep 25 17:40:18 2012
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.BasicPartitionOwner;
import org.apache.giraph.graph.partition.HashMasterPartitioner;
@@ -29,7 +30,6 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.MasterGraphPartitioner;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
@@ -70,7 +70,7 @@ public class SuperstepHashPartitionerFac
*
* @param conf Configuration to be stored.
*/
- public SuperstepMasterPartition(Configuration conf) {
+ public SuperstepMasterPartition(ImmutableClassesGiraphConfiguration conf) {
super(conf);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/JsonBase64VertexInputFormat.java Tue Sep 25 17:40:18 2012
@@ -18,8 +18,6 @@
package org.apache.giraph.io;
-import org.apache.giraph.graph.BspUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -66,14 +64,11 @@ public class JsonBase64VertexInputFormat
protected class JsonBase64VertexReader extends
TextVertexReaderFromEachLineProcessed<JSONObject> {
- /** Cached configuration */
- private Configuration conf;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
super.initialize(inputSplit, context);
- conf = context.getConfiguration();
}
@Override
@@ -93,7 +88,7 @@ public class JsonBase64VertexInputFormat
vertexObject.getString(JsonBase64VertexFormat.VERTEX_ID_KEY));
DataInput input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
- I vertexId = BspUtils.<I>createVertexId(conf);
+ I vertexId = getConf().createVertexId();
vertexId.readFields(input);
return vertexId;
} catch (JSONException e) {
@@ -109,7 +104,7 @@ public class JsonBase64VertexInputFormat
vertexObject.getString(JsonBase64VertexFormat.VERTEX_VALUE_KEY));
DataInputStream input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
- V vertexValue = BspUtils.<V>createVertexValue(conf);
+ V vertexValue = getConf().createVertexValue();
vertexValue.readFields(input);
return vertexValue;
} catch (JSONException e) {
@@ -139,11 +134,9 @@ public class JsonBase64VertexInputFormat
}
DataInputStream input = new DataInputStream(
new ByteArrayInputStream(decodedWritable));
- I targetVertexId =
- BspUtils.<I>createVertexId(getContext().getConfiguration());
+ I targetVertexId = getConf().createVertexId();
targetVertexId.readFields(input);
- E edgeValue =
- BspUtils.<E>createEdgeValue(getContext().getConfiguration());
+ E edgeValue = getConf().createEdgeValue();
edgeValue.readFields(input);
edgeMap.put(targetVertexId, edgeValue);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java Tue Sep 25 17:40:18 2012
@@ -18,12 +18,11 @@
package org.apache.giraph.io;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.BspInputSplit;
-import org.apache.giraph.graph.BspUtils;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -98,7 +97,7 @@ public class PseudoRandomVertexInputForm
/** BspInputSplit (used only for index). */
private BspInputSplit bspInputSplit;
/** Saved configuration */
- private Configuration configuration;
+ private ImmutableClassesGiraphConfiguration configuration;
/**
* Default constructor for reflection.
@@ -109,7 +108,8 @@ public class PseudoRandomVertexInputForm
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext context) throws IOException {
- configuration = context.getConfiguration();
+ configuration = new ImmutableClassesGiraphConfiguration(
+ context.getConfiguration());
aggregateVertices =
configuration.getLong(
PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, 0);
@@ -152,7 +152,7 @@ public class PseudoRandomVertexInputForm
public Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
getCurrentVertex() throws IOException, InterruptedException {
Vertex<LongWritable, DoubleWritable, DoubleWritable, M>
- vertex = BspUtils.createVertex(configuration);
+ vertex = configuration.createVertex();
long vertexId = startingVertexId + verticesRead;
// Seed on the vertex id to keep the vertex data the same when
// on different number of workers, but other parameters are the
@@ -174,9 +174,9 @@ public class PseudoRandomVertexInputForm
++verticesRead;
if (LOG.isTraceEnabled()) {
LOG.trace("next: Return vertexId=" +
- vertex.getId().get() +
- ", vertexValue=" + vertex.getValue() +
- ", edges=" + vertex.getEdges());
+ vertex.getId().get() +
+ ", vertexValue=" + vertex.getValue() +
+ ", edges=" + vertex.getEdges());
}
return vertex;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1390014&r1=1390013&r2=1390014&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Tue Sep 25 17:40:18 2012
@@ -18,7 +18,7 @@
package org.apache.giraph.io;
-import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexInputFormat;
import org.apache.giraph.graph.VertexReader;
@@ -94,11 +94,15 @@ public abstract class TextVertexInputFor
private RecordReader<LongWritable, Text> lineRecordReader;
/** Context passed to initialize */
private TaskAttemptContext context;
+ /** Cached configuration */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
this.context = context;
+ conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+ context.getConfiguration());
lineRecordReader = createLineRecordReader(inputSplit, context);
lineRecordReader.initialize(inputSplit, context);
}
@@ -151,6 +155,15 @@ public abstract class TextVertexInputFor
protected TaskAttemptContext getContext() {
return context;
}
+
+ /**
+ * Get the configuration.
+ *
+ * @return Configuration for this reader
+ */
+ protected ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ return conf;
+ }
}
/**
@@ -164,8 +177,7 @@ public abstract class TextVertexInputFor
public final Vertex<I, V, E, M> getCurrentVertex() throws IOException,
InterruptedException {
Text line = getRecordReader().getCurrentValue();
- Vertex<I, V, E, M> vertex = BspUtils
- .<I, V, E, M>createVertex(getContext().getConfiguration());
+ Vertex<I, V, E, M> vertex = getConf().createVertex();
vertex.initialize(getId(line), getValue(line), getEdges(line), null);
return vertex;
}
@@ -234,8 +246,7 @@ public abstract class TextVertexInputFor
Text line = getRecordReader().getCurrentValue();
Vertex<I, V, E, M> vertex;
T processed = preprocessLine(line);
- vertex = BspUtils
- .<I, V, E, M>createVertex(getContext().getConfiguration());
+ vertex = getConf().createVertex();
vertex.initialize(getId(processed), getValue(processed),
getEdges(processed), null);
return vertex;
@@ -322,8 +333,7 @@ public abstract class TextVertexInputFor
try {
processed = preprocessLine(line);
Configuration conf = getContext().getConfiguration();
- vertex = BspUtils
- .<I, V, E, M>createVertex(conf);
+ vertex = getConf().createVertex();
vertex.initialize(getId(processed), getValue(processed),
getEdges(processed), null);
} catch (IOException e) {