You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [12/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/example...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Feb 16 22:12:31 2012
@@ -48,598 +48,614 @@ import java.util.List;
* This mapper that will execute the BSP graph tasks. Since this mapper will
* not be passing data by key-value pairs through the MR framework, the
* types are irrelevant.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class GraphMapper<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> extends
- Mapper<Object, Object, Object, Object> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(GraphMapper.class);
- /** Coordination service worker */
- CentralizedServiceWorker<I, V, E, M> serviceWorker;
- /** Coordination service master thread */
- Thread masterThread = null;
- /** The map should be run exactly once, or else there is a problem. */
- boolean mapAlreadyRun = false;
- /** Manages the ZooKeeper servers if necessary (dynamic startup) */
- private ZooKeeperManager zkManager;
- /** Configuration */
- private Configuration conf;
- /** Already complete? */
- private boolean done = false;
- /** What kind of functions is this mapper doing? */
- private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
- /**
- * Graph state for all vertices that is used for the duration of
- * this mapper.
- */
- private GraphState<I,V,E,M> graphState = new GraphState<I, V, E, M>();
-
- /** What kinds of functions to run on this mapper */
- public enum MapFunctions {
- UNKNOWN,
- MASTER_ONLY,
- MASTER_ZOOKEEPER_ONLY,
- WORKER_ONLY,
- ALL,
- ALL_EXCEPT_ZOOKEEPER
- }
-
- /**
- * Get the map function enum
- */
- public MapFunctions getMapFunctions() {
- return mapFunctions;
- }
-
- /**
- * Get the aggregator usage, a subset of the functionality
- *
- * @return Aggregator usage interface
- */
- public final AggregatorUsage getAggregatorUsage() {
- return serviceWorker;
- }
-
- public final WorkerContext getWorkerContext() {
- return serviceWorker.getWorkerContext();
- }
-
- public final GraphState<I,V,E,M> getGraphState() {
- return graphState;
- }
-
- /**
- * Default handler for uncaught exceptions.
- */
- class OverrideExceptionHandler
- implements Thread.UncaughtExceptionHandler {
- public void uncaughtException(Thread t, Throwable e) {
- LOG.fatal(
- "uncaughtException: OverrideExceptionHandler on thread " +
- t.getName() + ", msg = " + e.getMessage() +
- ", exiting...", e);
- System.exit(1);
- }
- }
-
- /**
- * Copied from JobConf to get the location of this jar. Workaround for
- * things like Oozie map-reduce jobs.
- *
- * @param my_class Class to search the class loader path for to locate
- * the relevant jar file
- * @return Location of the jar file containing my_class
- */
- private static String findContainingJar(Class<?> my_class) {
- ClassLoader loader = my_class.getClassLoader();
- String class_file =
- my_class.getName().replaceAll("\\.", "/") + ".class";
- try {
- for(Enumeration<?> itr = loader.getResources(class_file);
- itr.hasMoreElements();) {
- URL url = (URL) itr.nextElement();
- if ("jar".equals(url.getProtocol())) {
- String toReturn = url.getPath();
- if (toReturn.startsWith("file:")) {
- toReturn = toReturn.substring("file:".length());
- }
- toReturn = URLDecoder.decode(toReturn, "UTF-8");
- return toReturn.replaceAll("!.*$", "");
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return null;
- }
-
- /**
- * Make sure that all registered classes have matching types. This
- * is a little tricky due to type erasure, cannot simply get them from
- * the class type arguments. Also, set the vertex index, vertex value,
- * edge value and message value classes.
- *
- * @param conf Configuration to get the various classes
- */
- public void determineClassTypes(Configuration conf) {
- Class<? extends BasicVertex<I, V, E, M>> vertexClass =
- BspUtils.<I, V, E, M>getVertexClass(conf);
- List<Class<?>> classList = ReflectionUtils.<BasicVertex>getTypeArguments(
- BasicVertex.class, vertexClass);
- Type vertexIndexType = classList.get(0);
- Type vertexValueType = classList.get(1);
- Type edgeValueType = classList.get(2);
- Type messageValueType = classList.get(3);
-
- Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
- BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
- classList = ReflectionUtils.<VertexInputFormat>getTypeArguments(
- VertexInputFormat.class, vertexInputFormatClass);
- if (classList.get(0) == null) {
- LOG.warn("Input format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(0))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
+ E extends Writable, M extends Writable> extends
+ Mapper<Object, Object, Object, Object> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(GraphMapper.class);
+ /** Coordination service worker */
+ private CentralizedServiceWorker<I, V, E, M> serviceWorker;
+ /** Coordination service master thread */
+ private Thread masterThread = null;
+ /** The map should be run exactly once, or else there is a problem. */
+ private boolean mapAlreadyRun = false;
+ /** Manages the ZooKeeper servers if necessary (dynamic startup) */
+ private ZooKeeperManager zkManager;
+ /** Configuration */
+ private Configuration conf;
+ /** Already complete? */
+ private boolean done = false;
+ /** What kind of functions is this mapper doing? */
+ private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
+ /**
+ * Graph state for all vertices that is used for the duration of
+ * this mapper.
+ */
+ private GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>();
+
+ /** What kinds of functions to run on this mapper */
+ public enum MapFunctions {
+ /** Undecided yet */
+ UNKNOWN,
+ /** Only be the master */
+ MASTER_ONLY,
+ /** Only be the master and ZooKeeper */
+ MASTER_ZOOKEEPER_ONLY,
+ /** Only be the worker */
+ WORKER_ONLY,
+ /** Do master, worker, and ZooKeeper */
+ ALL,
+ /** Do master and worker */
+ ALL_EXCEPT_ZOOKEEPER
+ }
+
+ /**
+ * Get the map function enum.
+ *
+ * @return Map functions of this mapper.
+ */
+ public MapFunctions getMapFunctions() {
+ return mapFunctions;
+ }
+
+ /**
+ * Get the aggregator usage, a subset of the functionality
+ *
+ * @return Aggregator usage interface
+ */
+ public final AggregatorUsage getAggregatorUsage() {
+ return serviceWorker;
+ }
+
+ public final WorkerContext getWorkerContext() {
+ return serviceWorker.getWorkerContext();
+ }
+
+ public final GraphState<I, V, E, M> getGraphState() {
+ return graphState;
+ }
+
+ /**
+ * Default handler for uncaught exceptions.
+ */
+ class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.fatal(
+ "uncaughtException: OverrideExceptionHandler on thread " +
+ t.getName() + ", msg = " + e.getMessage() +
+ ", exiting...", e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Copied from JobConf to get the location of this jar. Workaround for
+ * things like Oozie map-reduce jobs.
+ *
+ * @param myClass Class to search the class loader path for to locate
+ * the relevant jar file
+ * @return Location of the jar file containing myClass
+ */
+ private static String findContainingJar(Class<?> myClass) {
+ ClassLoader loader = myClass.getClassLoader();
+ String classFile =
+ myClass.getName().replaceAll("\\.", "/") + ".class";
+ try {
+ for (Enumeration<?> itr = loader.getResources(classFile);
+ itr.hasMoreElements();) {
+ URL url = (URL) itr.nextElement();
+ if ("jar".equals(url.getProtocol())) {
+ String toReturn = url.getPath();
+ if (toReturn.startsWith("file:")) {
+ toReturn = toReturn.substring("file:".length());
+ }
+ toReturn = URLDecoder.decode(toReturn, "UTF-8");
+ return toReturn.replaceAll("!.*$", "");
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ /**
+ * Make sure that all registered classes have matching types. This
+ * is a little tricky due to type erasure, cannot simply get them from
+ * the class type arguments. Also, set the vertex index, vertex value,
+ * edge value and message value classes.
+ *
+ * @param conf Configuration to get the various classes
+ */
+ public void determineClassTypes(Configuration conf) {
+ Class<? extends BasicVertex<I, V, E, M>> vertexClass =
+ BspUtils.<I, V, E, M>getVertexClass(conf);
+ List<Class<?>> classList = ReflectionUtils.<BasicVertex>getTypeArguments(
+ BasicVertex.class, vertexClass);
+ Type vertexIndexType = classList.get(0);
+ Type vertexValueType = classList.get(1);
+ Type edgeValueType = classList.get(2);
+ Type messageValueType = classList.get(3);
+
+ Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+ BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
+ classList = ReflectionUtils.<VertexInputFormat>getTypeArguments(
+ VertexInputFormat.class, vertexInputFormatClass);
+ if (classList.get(0) == null) {
+ LOG.warn("Input format vertex index type is not known");
+ } else if (!vertexIndexType.equals(classList.get(0))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex input format - " + classList.get(0));
+ }
+ if (classList.get(1) == null) {
+ LOG.warn("Input format vertex value type is not known");
+ } else if (!vertexValueType.equals(classList.get(1))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex value types don't match, " +
+ "vertex - " + vertexValueType +
+ ", vertex input format - " + classList.get(1));
+ }
+ if (classList.get(2) == null) {
+ LOG.warn("Input format edge value type is not known");
+ } else if (!edgeValueType.equals(classList.get(2))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", vertex input format - " + classList.get(2));
+ }
+ // If has vertex combiner class, check
+ Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
+ BspUtils.<I, M>getVertexCombinerClass(conf);
+ if (vertexCombinerClass != null) {
+ classList = ReflectionUtils.<VertexCombiner>getTypeArguments(
+ VertexCombiner.class, vertexCombinerClass);
+ if (!vertexIndexType.equals(classList.get(0))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
- ", vertex input format - " + classList.get(0));
- }
- if (classList.get(1) == null) {
- LOG.warn("Input format vertex value type is not known");
- } else if (!vertexValueType.equals(classList.get(1))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
+ ", vertex combiner - " + classList.get(0));
+ }
+ if (!messageValueType.equals(classList.get(1))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Message value types don't match, " +
"vertex - " + vertexValueType +
- ", vertex input format - " + classList.get(1));
- }
- if (classList.get(2) == null) {
- LOG.warn("Input format edge value type is not known");
- } else if (!edgeValueType.equals(classList.get(2))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex input format - " + classList.get(2));
- }
- // If has vertex combiner class, check
- Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
- BspUtils.<I, M>getVertexCombinerClass(conf);
- if (vertexCombinerClass != null) {
- classList = ReflectionUtils.<VertexCombiner>getTypeArguments(
- VertexCombiner.class, vertexCombinerClass);
- if (!vertexIndexType.equals(classList.get(0))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex combiner - " + classList.get(0));
- }
- if (!messageValueType.equals(classList.get(1))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Message value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex combiner - " + classList.get(1));
- }
- }
- // If has vertex output format class, check
- Class<? extends VertexOutputFormat<I, V, E>>
- vertexOutputFormatClass =
- BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
- if (vertexOutputFormatClass != null) {
- classList =
- ReflectionUtils.<VertexOutputFormat>getTypeArguments(
- VertexOutputFormat.class, vertexOutputFormatClass);
- if (classList.get(0) == null) {
- LOG.warn("Output format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(0))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex output format - " + classList.get(0));
- }
- if (classList.get(1) == null) {
- LOG.warn("Output format vertex value type is not known");
- } else if (!vertexValueType.equals(classList.get(1))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex output format - " + classList.get(1));
- } if (classList.get(2) == null) {
- LOG.warn("Output format edge value type is not known");
- } else if (!edgeValueType.equals(classList.get(2))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex output format - " + classList.get(2));
- }
- }
- // Vertex resolver might never select the types
- Class<? extends VertexResolver<I, V, E, M>>
- vertexResolverClass =
- BspUtils.<I, V, E, M>getVertexResolverClass(conf);
- classList = ReflectionUtils.<VertexResolver>getTypeArguments(
- VertexResolver.class, vertexResolverClass);
- if (classList.get(0) != null &&
- !vertexIndexType.equals(classList.get(0))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
+ ", vertex combiner - " + classList.get(1));
+ }
+ }
+ // If has vertex output format class, check
+ Class<? extends VertexOutputFormat<I, V, E>>
+ vertexOutputFormatClass =
+ BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
+ if (vertexOutputFormatClass != null) {
+ classList =
+ ReflectionUtils.<VertexOutputFormat>getTypeArguments(
+ VertexOutputFormat.class, vertexOutputFormatClass);
+ if (classList.get(0) == null) {
+ LOG.warn("Output format vertex index type is not known");
+ } else if (!vertexIndexType.equals(classList.get(0))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
"vertex - " + vertexIndexType +
- ", vertex resolver - " + classList.get(0));
- }
- if (classList.get(1) != null &&
- !vertexValueType.equals(classList.get(1))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
+ ", vertex output format - " + classList.get(0));
+ }
+ if (classList.get(1) == null) {
+ LOG.warn("Output format vertex value type is not known");
+ } else if (!vertexValueType.equals(classList.get(1))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex value types don't match, " +
"vertex - " + vertexValueType +
- ", vertex resolver - " + classList.get(1));
- }
- if (classList.get(2) != null &&
- !edgeValueType.equals(classList.get(2))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex resolver - " + classList.get(2));
- }
- if (classList.get(3) != null &&
- !messageValueType.equals(classList.get(3))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Message value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex resolver - " + classList.get(3));
- }
- conf.setClass(GiraphJob.VERTEX_INDEX_CLASS,
- (Class<?>) vertexIndexType,
- WritableComparable.class);
- conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
- (Class<?>) vertexValueType,
- Writable.class);
- conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
- (Class<?>) edgeValueType,
- Writable.class);
- conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
- (Class<?>) messageValueType,
- Writable.class);
- }
-
- /**
- * Figure out what functions this mapper should do. Basic logic is as
- * follows:
- * 1) If not split master, everyone does the everything and/or running
- * ZooKeeper.
- * 2) If split master/worker, masters also run ZooKeeper (if it's not
- * given to us).
- *
- * @param conf Configuration to use
- * @return Functions that this mapper should do.
- */
- private static MapFunctions determineMapFunctions(
- Configuration conf,
- ZooKeeperManager zkManager) {
- boolean splitMasterWorker =
- conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
- GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
- int taskPartition = conf.getInt("mapred.task.partition", -1);
- boolean zkAlreadyProvided =
- conf.get(GiraphJob.ZOOKEEPER_LIST) != null;
- MapFunctions functions = MapFunctions.UNKNOWN;
- // What functions should this mapper do?
- if (!splitMasterWorker) {
- if ((zkManager != null) && zkManager.runsZooKeeper()) {
- functions = MapFunctions.ALL;
- } else {
- functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER;
- }
+ ", vertex output format - " + classList.get(1));
+ }
+ if (classList.get(2) == null) {
+ LOG.warn("Output format edge value type is not known");
+ } else if (!edgeValueType.equals(classList.get(2))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex output format - " + classList.get(2));
+ }
+ }
+ // Vertex resolver might never select the types
+ Class<? extends VertexResolver<I, V, E, M>>
+ vertexResolverClass =
+ BspUtils.<I, V, E, M>getVertexResolverClass(conf);
+ classList = ReflectionUtils.<VertexResolver>getTypeArguments(
+ VertexResolver.class, vertexResolverClass);
+ if (classList.get(0) != null &&
+ !vertexIndexType.equals(classList.get(0))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex index types don't match, " +
+ "vertex - " + vertexIndexType +
+ ", vertex resolver - " + classList.get(0));
+ }
+ if (classList.get(1) != null &&
+ !vertexValueType.equals(classList.get(1))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Vertex value types don't match, " +
+ "vertex - " + vertexValueType +
+ ", vertex resolver - " + classList.get(1));
+ }
+ if (classList.get(2) != null &&
+ !edgeValueType.equals(classList.get(2))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Edge value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", vertex resolver - " + classList.get(2));
+ }
+ if (classList.get(3) != null &&
+ !messageValueType.equals(classList.get(3))) {
+ throw new IllegalArgumentException(
+ "checkClassTypes: Message value types don't match, " +
+ "vertex - " + edgeValueType +
+ ", vertex resolver - " + classList.get(3));
+ }
+ conf.setClass(GiraphJob.VERTEX_INDEX_CLASS,
+ (Class<?>) vertexIndexType,
+ WritableComparable.class);
+ conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
+ (Class<?>) vertexValueType,
+ Writable.class);
+ conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
+ (Class<?>) edgeValueType,
+ Writable.class);
+ conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
+ (Class<?>) messageValueType,
+ Writable.class);
+ }
+
+ /**
+ * Figure out what functions this mapper should do. Basic logic is as
+ * follows:
+ * 1) If not split master, everyone does the everything and/or running
+ * ZooKeeper.
+ * 2) If split master/worker, masters also run ZooKeeper (if it's not
+ * given to us).
+ *
+ * @param conf Configuration to use
+ * @param zkManager ZooKeeper manager to help determine whether to run
+ * ZooKeeper
+ * @return Functions that this mapper should do.
+ */
+ private static MapFunctions determineMapFunctions(
+ Configuration conf,
+ ZooKeeperManager zkManager) {
+ boolean splitMasterWorker =
+ conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
+ GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
+ int taskPartition = conf.getInt("mapred.task.partition", -1);
+ boolean zkAlreadyProvided =
+ conf.get(GiraphJob.ZOOKEEPER_LIST) != null;
+ MapFunctions functions = MapFunctions.UNKNOWN;
+ // What functions should this mapper do?
+ if (!splitMasterWorker) {
+ if ((zkManager != null) && zkManager.runsZooKeeper()) {
+ functions = MapFunctions.ALL;
+ } else {
+ functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER;
+ }
+ } else {
+ if (zkAlreadyProvided) {
+ int masterCount =
+ conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
+ GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+ if (taskPartition < masterCount) {
+ functions = MapFunctions.MASTER_ONLY;
} else {
- if (zkAlreadyProvided) {
- int masterCount =
- conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
- GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
- if (taskPartition < masterCount) {
- functions = MapFunctions.MASTER_ONLY;
- } else {
- functions = MapFunctions.WORKER_ONLY;
- }
- } else {
- if ((zkManager != null) && zkManager.runsZooKeeper()) {
- functions = MapFunctions.MASTER_ZOOKEEPER_ONLY;
- } else {
- functions = MapFunctions.WORKER_ONLY;
- }
- }
+ functions = MapFunctions.WORKER_ONLY;
}
- return functions;
- }
-
- @Override
- public void setup(Context context)
- throws IOException, InterruptedException {
- context.setStatus("setup: Beginning mapper setup.");
- graphState.setContext(context);
- // Setting the default handler for uncaught exceptions.
- Thread.setDefaultUncaughtExceptionHandler(
- new OverrideExceptionHandler());
- conf = context.getConfiguration();
- // Hadoop security needs this property to be set
- if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
- conf.set("mapreduce.job.credentials.binary",
- System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
- }
- // Ensure the user classes have matching types and figure them out
- determineClassTypes(conf);
-
- // Do some initial setup (possibly starting up a Zookeeper service)
- context.setStatus("setup: Initializing Zookeeper services.");
- if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
- GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
- Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
- String zkClasspath = null;
- if(fileClassPaths == null) {
- if(LOG.isInfoEnabled()) {
- LOG.info("Distributed cache is empty. Assuming fatjar.");
- }
- String jarFile = context.getJar();
- if (jarFile == null) {
- jarFile = findContainingJar(getClass());
- }
- zkClasspath = jarFile.replaceFirst("file:", "");
- } else {
- StringBuilder sb = new StringBuilder();
- sb.append(fileClassPaths[0]);
-
- for (int i = 1; i < fileClassPaths.length; i++) {
- sb.append(":");
- sb.append(fileClassPaths[i]);
- }
- zkClasspath = sb.toString();
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: classpath @ " + zkClasspath);
- }
- conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
- }
- String serverPortList =
- conf.get(GiraphJob.ZOOKEEPER_LIST, "");
- if (serverPortList == "") {
- zkManager = new ZooKeeperManager(context);
- context.setStatus("setup: Setting up Zookeeper manager.");
- zkManager.setup();
- if (zkManager.computationDone()) {
- done = true;
- return;
- }
- zkManager.onlineZooKeeperServers();
- serverPortList = zkManager.getZooKeeperServerPortString();
- }
- context.setStatus("setup: Connected to Zookeeper service " +
- serverPortList);
- this.mapFunctions = determineMapFunctions(conf, zkManager);
-
- // Sometimes it takes a while to get multiple ZooKeeper servers up
- if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
- GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) {
- Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT *
- GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME);
- }
- int sessionMsecTimeout =
- conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT,
- GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
- try {
- if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
- (mapFunctions == MapFunctions.MASTER_ONLY) ||
- (mapFunctions == MapFunctions.ALL) ||
- (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Starting up BspServiceMaster " +
- "(master thread)...");
- }
- masterThread =
- new MasterThread<I, V, E, M>(
- new BspServiceMaster<I, V, E, M>(serverPortList,
- sessionMsecTimeout,
- context,
- this),
- context);
- masterThread.start();
- }
- if ((mapFunctions == MapFunctions.WORKER_ONLY) ||
- (mapFunctions == MapFunctions.ALL) ||
- (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Starting up BspServiceWorker...");
- }
- serviceWorker = new BspServiceWorker<I, V, E, M>(
- serverPortList,
- sessionMsecTimeout,
- context,
- this,
- graphState);
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Registering health of this worker...");
- }
- serviceWorker.setup();
- }
- } catch (Exception e) {
- LOG.error("setup: Caught exception just before end of setup", e);
- if (zkManager != null ) {
- zkManager.offlineZooKeeperServers(
- ZooKeeperManager.State.FAILED);
- }
- throw new RuntimeException(
- "setup: Offlining servers due to exception...", e);
+ } else {
+ if ((zkManager != null) && zkManager.runsZooKeeper()) {
+ functions = MapFunctions.MASTER_ZOOKEEPER_ONLY;
+ } else {
+ functions = MapFunctions.WORKER_ONLY;
}
- context.setStatus(getMapFunctions().toString() + " starting...");
+ }
}
+ return functions;
+ }
- @Override
- public void map(Object key, Object value, Context context)
- throws IOException, InterruptedException {
- // map() only does computation
- // 1) Run checkpoint per frequency policy.
- // 2) For every vertex on this mapper, run the compute() function
- // 3) Wait until all messaging is done.
- // 4) Check if all vertices are done. If not goto 2).
- // 5) Dump output.
- if (done == true) {
- return;
- }
- if ((serviceWorker != null) && (graphState.getNumVertices() == 0)) {
- return;
- }
-
- if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
- (mapFunctions == MapFunctions.MASTER_ONLY)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("map: No need to do anything when not a worker");
- }
- return;
- }
-
- if (mapAlreadyRun) {
- throw new RuntimeException("In BSP, map should have only been" +
- " run exactly once, (already run)");
- }
- mapAlreadyRun = true;
-
- graphState.setSuperstep(serviceWorker.getSuperstep()).
- setContext(context).setGraphMapper(this);
-
- try {
- serviceWorker.getWorkerContext().preApplication();
- } catch (InstantiationException e) {
- LOG.fatal("map: preApplication failed in instantiation", e);
- throw new RuntimeException(
- "map: preApplication failed in instantiation", e);
- } catch (IllegalAccessException e) {
- LOG.fatal("map: preApplication failed in access", e);
- throw new RuntimeException(
- "map: preApplication failed in access",e );
- }
- context.progress();
-
- List<PartitionStats> partitionStatsList =
- new ArrayList<PartitionStats>();
- do {
- long superstep = serviceWorker.getSuperstep();
-
- graphState.setSuperstep(superstep);
-
- Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
- serviceWorker.startSuperstep();
- if (zkManager != null && zkManager.runsZooKeeper()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("map: Chosen to run ZooKeeper...");
- }
- context.setStatus("map: Running Zookeeper Server");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
- }
- context.progress();
-
- serviceWorker.exchangeVertexPartitions(
- masterAssignedPartitionOwners);
- context.progress();
-
- // Might need to restart from another superstep
- // (manually or automatic), or store a checkpoint
- if (serviceWorker.getRestartedSuperstep() == superstep) {
- if (LOG.isInfoEnabled()) {
- LOG.info("map: Loading from checkpoint " + superstep);
- }
- serviceWorker.loadCheckpoint(
- serviceWorker.getRestartedSuperstep());
- } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
- serviceWorker.storeCheckpoint();
- }
-
- serviceWorker.getWorkerContext().setGraphState(graphState);
- serviceWorker.getWorkerContext().preSuperstep();
- context.progress();
-
- partitionStatsList.clear();
- for (Partition<I, V, E, M> partition :
- serviceWorker.getPartitionMap().values()) {
- PartitionStats partitionStats =
- new PartitionStats(partition.getPartitionId(), 0, 0, 0);
- for (BasicVertex<I, V, E, M> basicVertex :
- partition.getVertices()) {
- // Make sure every vertex has the current
- // graphState before computing
- basicVertex.setGraphState(graphState);
- if (basicVertex.isHalted()
- && !Iterables.isEmpty(basicVertex.getMessages())) {
- basicVertex.halt = false;
- }
- if (!basicVertex.isHalted()) {
- Iterator<M> vertexMsgIt =
- basicVertex.getMessages().iterator();
- context.progress();
- basicVertex.compute(vertexMsgIt);
- basicVertex.releaseResources();
- }
- if (basicVertex.isHalted()) {
- partitionStats.incrFinishedVertexCount();
- }
- partitionStats.incrVertexCount();
- partitionStats.addEdgeCount(basicVertex.getNumOutEdges());
- }
- partitionStatsList.add(partitionStats);
- }
- } while (!serviceWorker.finishSuperstep(partitionStatsList));
+ @Override
+ public void setup(Context context)
+ throws IOException, InterruptedException {
+ context.setStatus("setup: Beginning mapper setup.");
+ graphState.setContext(context);
+ // Setting the default handler for uncaught exceptions.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new OverrideExceptionHandler());
+ conf = context.getConfiguration();
+ // Hadoop security needs this property to be set
+ if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+ conf.set("mapreduce.job.credentials.binary",
+ System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+ }
+ // Ensure the user classes have matching types and figure them out
+ determineClassTypes(conf);
+
+ // Do some initial setup (possibly starting up a Zookeeper service)
+ context.setStatus("setup: Initializing Zookeeper services.");
+ if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
+ GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
+ Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
+ String zkClasspath = null;
+ if (fileClassPaths == null) {
if (LOG.isInfoEnabled()) {
- LOG.info("map: BSP application done " +
- "(global vertices marked done)");
+ LOG.info("Distributed cache is empty. Assuming fatjar.");
}
-
- serviceWorker.getWorkerContext().postApplication();
- context.progress();
- }
-
- @Override
- public void cleanup(Context context)
- throws IOException, InterruptedException {
+ String jarFile = context.getJar();
+ if (jarFile == null) {
+ jarFile = findContainingJar(getClass());
+ }
+ zkClasspath = jarFile.replaceFirst("file:", "");
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append(fileClassPaths[0]);
+
+ for (int i = 1; i < fileClassPaths.length; i++) {
+ sb.append(":");
+ sb.append(fileClassPaths[i]);
+ }
+ zkClasspath = sb.toString();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: classpath @ " + zkClasspath);
+ }
+ conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
+ }
+ String serverPortList =
+ conf.get(GiraphJob.ZOOKEEPER_LIST, "");
+ if (serverPortList.isEmpty()) {
+ zkManager = new ZooKeeperManager(context);
+ context.setStatus("setup: Setting up Zookeeper manager.");
+ zkManager.setup();
+ if (zkManager.computationDone()) {
+ done = true;
+ return;
+ }
+ zkManager.onlineZooKeeperServers();
+ serverPortList = zkManager.getZooKeeperServerPortString();
+ }
+ context.setStatus("setup: Connected to Zookeeper service " +
+ serverPortList);
+ this.mapFunctions = determineMapFunctions(conf, zkManager);
+
+ // Sometimes it takes a while to get multiple ZooKeeper servers up
+ if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
+ GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) {
+ Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT *
+ GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME);
+ }
+ int sessionMsecTimeout =
+ conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT,
+ GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ try {
+ if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
+ (mapFunctions == MapFunctions.MASTER_ONLY) ||
+ (mapFunctions == MapFunctions.ALL) ||
+ (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
if (LOG.isInfoEnabled()) {
- LOG.info("cleanup: Starting for " + getMapFunctions());
+ LOG.info("setup: Starting up BspServiceMaster " +
+ "(master thread)...");
}
- if (done) {
- return;
+ masterThread =
+ new MasterThread<I, V, E, M>(
+ new BspServiceMaster<I, V, E, M>(serverPortList,
+ sessionMsecTimeout,
+ context,
+ this),
+ context);
+ masterThread.start();
+ }
+ if ((mapFunctions == MapFunctions.WORKER_ONLY) ||
+ (mapFunctions == MapFunctions.ALL) ||
+ (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Starting up BspServiceWorker...");
}
-
- if (serviceWorker != null) {
- serviceWorker.cleanup();
+ serviceWorker = new BspServiceWorker<I, V, E, M>(
+ serverPortList,
+ sessionMsecTimeout,
+ context,
+ this,
+ graphState);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Registering health of this worker...");
}
- try {
- if (masterThread != null) {
- masterThread.join();
- }
- } catch (InterruptedException e) {
- // cleanup phase -- just log the error
- LOG.error("cleanup: Master thread couldn't join");
- }
- if (zkManager != null) {
- zkManager.offlineZooKeeperServers(
- ZooKeeperManager.State.FINISHED);
+ serviceWorker.setup();
+ }
+ } catch (IOException e) {
+ LOG.error("setup: Caught exception just before end of setup", e);
+ if (zkManager != null) {
+ zkManager.offlineZooKeeperServers(
+ ZooKeeperManager.State.FAILED);
+ }
+ throw new RuntimeException(
+ "setup: Offlining servers due to exception...", e);
+ }
+ context.setStatus(getMapFunctions().toString() + " starting...");
+ }
+
+ @Override
+ public void map(Object key, Object value, Context context)
+ throws IOException, InterruptedException {
+ // map() only does computation
+ // 1) Run checkpoint per frequency policy.
+ // 2) For every vertex on this mapper, run the compute() function
+ // 3) Wait until all messaging is done.
+ // 4) Check if all vertices are done. If not goto 2).
+ // 5) Dump output.
+ if (done) {
+ return;
+ }
+ if ((serviceWorker != null) && (graphState.getNumVertices() == 0)) {
+ return;
+ }
+
+ if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
+ (mapFunctions == MapFunctions.MASTER_ONLY)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("map: No need to do anything when not a worker");
+ }
+ return;
+ }
+
+ if (mapAlreadyRun) {
+ throw new RuntimeException("In BSP, map should have only been" +
+ " run exactly once, (already run)");
+ }
+ mapAlreadyRun = true;
+
+ graphState.setSuperstep(serviceWorker.getSuperstep()).
+ setContext(context).setGraphMapper(this);
+
+ try {
+ serviceWorker.getWorkerContext().preApplication();
+ } catch (InstantiationException e) {
+ LOG.fatal("map: preApplication failed in instantiation", e);
+ throw new RuntimeException(
+ "map: preApplication failed in instantiation", e);
+ } catch (IllegalAccessException e) {
+ LOG.fatal("map: preApplication failed in access", e);
+ throw new RuntimeException(
+ "map: preApplication failed in access", e);
+ }
+ context.progress();
+
+ List<PartitionStats> partitionStatsList =
+ new ArrayList<PartitionStats>();
+ do {
+ long superstep = serviceWorker.getSuperstep();
+
+ graphState.setSuperstep(superstep);
+
+ Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
+ serviceWorker.startSuperstep();
+ if (zkManager != null && zkManager.runsZooKeeper()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("map: Chosen to run ZooKeeper...");
}
- }
+ context.setStatus("map: Running Zookeeper Server");
+ }
- @Override
- public void run(Context context) throws IOException, InterruptedException {
- // Notify the master quicker if there is worker failure rather than
- // waiting for ZooKeeper to timeout and delete the ephemeral znodes
- try {
- setup(context);
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(),
- context.getCurrentValue(),
- context);
- }
- cleanup(context);
- } catch (Exception e) {
- if (mapFunctions == MapFunctions.WORKER_ONLY) {
- serviceWorker.failureCleanup();
- }
- throw new IllegalStateException(
- "run: Caught an unrecoverable exception " + e.getMessage(), e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
+ }
+ context.progress();
+
+ serviceWorker.exchangeVertexPartitions(
+ masterAssignedPartitionOwners);
+ context.progress();
+
+ // Might need to restart from another superstep
+ // (manually or automatic), or store a checkpoint
+ if (serviceWorker.getRestartedSuperstep() == superstep) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("map: Loading from checkpoint " + superstep);
}
+ serviceWorker.loadCheckpoint(
+ serviceWorker.getRestartedSuperstep());
+ } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
+ serviceWorker.storeCheckpoint();
+ }
+
+ serviceWorker.getWorkerContext().setGraphState(graphState);
+ serviceWorker.getWorkerContext().preSuperstep();
+ context.progress();
+
+ partitionStatsList.clear();
+ for (Partition<I, V, E, M> partition :
+ serviceWorker.getPartitionMap().values()) {
+ PartitionStats partitionStats =
+ new PartitionStats(partition.getPartitionId(), 0, 0, 0);
+ for (BasicVertex<I, V, E, M> basicVertex :
+ partition.getVertices()) {
+ // Make sure every vertex has the current
+ // graphState before computing
+ basicVertex.setGraphState(graphState);
+ if (basicVertex.isHalted() &
+ !Iterables.isEmpty(basicVertex.getMessages())) {
+ basicVertex.halt = false;
+ }
+ if (!basicVertex.isHalted()) {
+ Iterator<M> vertexMsgIt =
+ basicVertex.getMessages().iterator();
+ context.progress();
+ basicVertex.compute(vertexMsgIt);
+ basicVertex.releaseResources();
+ }
+ if (basicVertex.isHalted()) {
+ partitionStats.incrFinishedVertexCount();
+ }
+ partitionStats.incrVertexCount();
+ partitionStats.addEdgeCount(basicVertex.getNumOutEdges());
+ }
+ partitionStatsList.add(partitionStats);
+ }
+ } while (!serviceWorker.finishSuperstep(partitionStatsList));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("map: BSP application done " +
+ "(global vertices marked done)");
+ }
+
+ serviceWorker.getWorkerContext().postApplication();
+ context.progress();
+ }
+
+ @Override
+ public void cleanup(Context context)
+ throws IOException, InterruptedException {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("cleanup: Starting for " + getMapFunctions());
+ }
+ if (done) {
+ return;
+ }
+
+ if (serviceWorker != null) {
+ serviceWorker.cleanup();
+ }
+ try {
+ if (masterThread != null) {
+ masterThread.join();
+ }
+ } catch (InterruptedException e) {
+ // cleanup phase -- just log the error
+ LOG.error("cleanup: Master thread couldn't join");
+ }
+ if (zkManager != null) {
+ zkManager.offlineZooKeeperServers(
+ ZooKeeperManager.State.FINISHED);
+ }
+ }
+
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ // Notify the master quicker if there is worker failure rather than
+ // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+ try {
+ setup(context);
+ while (context.nextKeyValue()) {
+ map(context.getCurrentKey(),
+ context.getCurrentValue(),
+ context);
+ }
+ cleanup(context);
+ } catch (IOException e) {
+ if (mapFunctions == MapFunctions.WORKER_ONLY) {
+ serviceWorker.failureCleanup();
+ }
+ throw new IllegalStateException(
+ "run: Caught an unrecoverable exception " + e.getMessage(), e);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Thu Feb 16 22:12:31 2012
@@ -22,84 +22,120 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
-/*
+/**
* Global state of the graph. Should be treated as a singleton (but is kept
* as a regular bean to facilitate ease of unit testing)
*
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- * @param <M> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class GraphState<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> {
- /** Graph-wide superstep */
- private long superstep = 0;
- /** Graph-wide number of vertices */
- private long numVertices = -1;
- /** Graph-wide number of edges */
- private long numEdges = -1;
- /** Graph-wide map context */
- private Mapper.Context context;
- /** Graph-wide BSP Mapper for this Vertex */
- private GraphMapper<I, V, E, M> graphMapper;
- /** Graph-wide worker communications */
- private WorkerCommunications<I, V, E, M> workerCommunications;
-
- public long getSuperstep() {
- return superstep;
- }
-
- public GraphState<I, V, E, M> setSuperstep(long superstep) {
- this.superstep = superstep;
- return this;
- }
-
- public long getNumVertices() {
- return numVertices;
- }
-
- public GraphState<I, V, E, M> setNumVertices(long numVertices) {
- this.numVertices = numVertices;
- return this;
- }
-
- public long getNumEdges() {
- return numEdges;
- }
-
- public GraphState<I, V, E, M> setNumEdges(long numEdges) {
- this.numEdges = numEdges;
- return this;
- }
-
- public Mapper.Context getContext() {
- return context;
- }
-
- public GraphState<I, V, E ,M> setContext(Mapper.Context context) {
- this.context = context;
- return this;
- }
-
- public GraphMapper<I, V, E, M> getGraphMapper() {
- return graphMapper;
- }
-
- public GraphState<I, V, E, M> setGraphMapper(
- GraphMapper<I, V, E, M> graphMapper) {
- this.graphMapper = graphMapper;
- return this;
- }
-
- public GraphState<I, V, E, M> setWorkerCommunications(
- WorkerCommunications<I, V, E, M> workerCommunications) {
- this.workerCommunications = workerCommunications;
- return this;
- }
-
- public WorkerCommunications<I, V, E, M> getWorkerCommunications() {
- return workerCommunications;
- }
+E extends Writable, M extends Writable> {
+ /** Graph-wide superstep */
+ private long superstep = 0;
+ /** Graph-wide number of vertices */
+ private long numVertices = -1;
+ /** Graph-wide number of edges */
+ private long numEdges = -1;
+ /** Graph-wide map context */
+ private Mapper.Context context;
+ /** Graph-wide BSP Mapper for this Vertex */
+ private GraphMapper<I, V, E, M> graphMapper;
+ /** Graph-wide worker communications */
+ private WorkerCommunications<I, V, E, M> workerCommunications;
+
+ public long getSuperstep() {
+ return superstep;
+ }
+
+ /**
+ * Set the current superstep.
+ *
+ * @param superstep Current superstep to use.
+ * @return Returns this object.
+ */
+ public GraphState<I, V, E, M> setSuperstep(long superstep) {
+ this.superstep = superstep;
+ return this;
+ }
+
+ public long getNumVertices() {
+ return numVertices;
+ }
+
+ /**
+ * Set the current number of vertices.
+ *
+ * @param numVertices Current number of vertices.
+ * @return Returns this object.
+ */
+ public GraphState<I, V, E, M> setNumVertices(long numVertices) {
+ this.numVertices = numVertices;
+ return this;
+ }
+
+ public long getNumEdges() {
+ return numEdges;
+ }
+
+ /**
+ * Set the current number of edges.
+ *
+ * @param numEdges Current number of edges.
+ * @return Returns this object.
+ */
+ public GraphState<I, V, E, M> setNumEdges(long numEdges) {
+ this.numEdges = numEdges;
+ return this;
+ }
+
+ public Mapper.Context getContext() {
+ return context;
+ }
+
+ /**
+ * Set the current context.
+ *
+ * @param context Current context.
+ * @return Returns this object.
+ */
+ public GraphState<I, V, E, M> setContext(Mapper.Context context) {
+ this.context = context;
+ return this;
+ }
+
+ public GraphMapper<I, V, E, M> getGraphMapper() {
+ return graphMapper;
+ }
+
+ /**
+ * Set the current graph mapper.
+ *
+ * @param graphMapper Current graph mapper.
+ * @return Returns this object.
+ */
+ public GraphState<I, V, E, M> setGraphMapper(
+ GraphMapper<I, V, E, M> graphMapper) {
+ this.graphMapper = graphMapper;
+ return this;
+ }
+
+ /**
+ * Set the current worker communications.
+ *
+ * @param workerCommunications Current worker communications.
+ * @return Returns this object.
+ */
+ public GraphState<I, V, E, M> setWorkerCommunications(
+ WorkerCommunications<I, V, E, M> workerCommunications) {
+ this.workerCommunications = workerCommunications;
+ return this;
+ }
+
+ public WorkerCommunications<I, V, E, M> getWorkerCommunications() {
+ return workerCommunications;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java Thu Feb 16 22:12:31 2012
@@ -48,196 +48,196 @@ import java.util.Map;
*/
@SuppressWarnings("rawtypes")
public abstract class HashMapVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends MutableVertex<I, V, E, M> {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
- /** Vertex id */
- private I vertexId = null;
- /** Vertex value */
- private V vertexValue = null;
- /** Map of destination vertices and their edge values */
- protected final Map<I, Edge<I, E>> destEdgeMap =
- new HashMap<I, Edge<I, E>>();
- /** List of incoming messages from the previous superstep */
- private final List<M> msgList = Lists.newArrayList();
-
- @Override
- public void initialize(
- I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
- if (vertexId != null) {
- setVertexId(vertexId);
- }
- if (vertexValue != null) {
- setVertexValue(vertexValue);
- }
- if (edges != null && !edges.isEmpty()) {
- for (Map.Entry<I, E> entry : edges.entrySet()) {
- destEdgeMap.put(
- entry.getKey(),
- new Edge<I, E>(entry.getKey(), entry.getValue()));
- }
- }
- if (messages != null) {
- Iterables.<M>addAll(msgList, messages);
- }
- }
-
- @Override
- public final boolean addEdge(I targetVertexId, E edgeValue) {
- if (destEdgeMap.put(
- targetVertexId,
- new Edge<I, E>(targetVertexId, edgeValue)) != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addEdge: Vertex=" + vertexId +
- ": already added an edge value for dest vertex id " +
- targetVertexId);
- }
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public long getSuperstep() {
- return getGraphState().getSuperstep();
- }
-
- @Override
- public final void setVertexId(I vertexId) {
- this.vertexId = vertexId;
- }
-
- @Override
- public final I getVertexId() {
- return vertexId;
- }
-
- @Override
- public final V getVertexValue() {
- return vertexValue;
- }
-
- @Override
- public final void setVertexValue(V vertexValue) {
- this.vertexValue = vertexValue;
- }
-
- @Override
- public E getEdgeValue(I targetVertexId) {
- Edge<I, E> edge = destEdgeMap.get(targetVertexId);
- return edge != null ? edge.getEdgeValue() : null;
- }
-
- @Override
- public boolean hasEdge(I targetVertexId) {
- return destEdgeMap.containsKey(targetVertexId);
- }
-
- /**
- * Get an iterator to the edges on this vertex.
- *
- * @return A <em>sorted</em> iterator, as defined by the sort-order
- * of the vertex ids
- */
- @Override
- public Iterator<I> iterator() {
- return destEdgeMap.keySet().iterator();
- }
-
- @Override
- public int getNumOutEdges() {
- return destEdgeMap.size();
- }
-
- @Override
- public E removeEdge(I targetVertexId) {
- Edge<I, E> edge = destEdgeMap.remove(targetVertexId);
- if (edge != null) {
- return edge.getEdgeValue();
- } else {
- return null;
- }
- }
-
- @Override
- public final void sendMsgToAllEdges(M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- for (Edge<I, E> edge : destEdgeMap.values()) {
- sendMsg(edge.getDestVertexId(), msg);
- }
- }
-
- @Override
- final public void readFields(DataInput in) throws IOException {
- vertexId = BspUtils.<I>createVertexIndex(getConf());
- vertexId.readFields(in);
- boolean hasVertexValue = in.readBoolean();
- if (hasVertexValue) {
- vertexValue = BspUtils.<V>createVertexValue(getConf());
- vertexValue.readFields(in);
- }
- long edgeMapSize = in.readLong();
- for (long i = 0; i < edgeMapSize; ++i) {
- Edge<I, E> edge = new Edge<I, E>();
- edge.setConf(getConf());
- edge.readFields(in);
- addEdge(edge.getDestVertexId(), edge.getEdgeValue());
- }
- long msgListSize = in.readLong();
- for (long i = 0; i < msgListSize; ++i) {
- M msg = BspUtils.<M>createMessageValue(getConf());
- msg.readFields(in);
- msgList.add(msg);
- }
- halt = in.readBoolean();
- }
-
- @Override
- final public void write(DataOutput out) throws IOException {
- vertexId.write(out);
- out.writeBoolean(vertexValue != null);
- if (vertexValue != null) {
- vertexValue.write(out);
- }
- out.writeLong(destEdgeMap.size());
- for (Edge<I, E> edge : destEdgeMap.values()) {
- edge.write(out);
- }
- out.writeLong(msgList.size());
- for (M msg : msgList) {
- msg.write(out);
- }
- out.writeBoolean(halt);
- }
-
- @Override
- void putMessages(Iterable<M> messages) {
- msgList.clear();
- for (M message : messages) {
- msgList.add(message);
- }
- }
-
- @Override
- public Iterable<M> getMessages() {
- return Iterables.unmodifiableIterable(msgList);
- }
-
- @Override
- void releaseResources() {
- // Hint to GC to free the messages
- msgList.clear();
- }
-
- @Override
- public String toString() {
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
- ",#edges=" + destEdgeMap.size() + ")";
- }
+ V extends Writable, E extends Writable, M extends Writable>
+ extends MutableVertex<I, V, E, M> {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
+ /** Map of destination vertices and their edge values */
+ protected final Map<I, Edge<I, E>> destEdgeMap =
+ new HashMap<I, Edge<I, E>>();
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = Lists.newArrayList();
+
+ @Override
+ public void initialize(
+ I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ if (edges != null && !edges.isEmpty()) {
+ for (Map.Entry<I, E> entry : edges.entrySet()) {
+ destEdgeMap.put(
+ entry.getKey(),
+ new Edge<I, E>(entry.getKey(), entry.getValue()));
+ }
+ }
+ if (messages != null) {
+ Iterables.<M>addAll(msgList, messages);
+ }
+ }
+
+ @Override
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ if (destEdgeMap.put(
+ targetVertexId,
+ new Edge<I, E>(targetVertexId, edgeValue)) != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addEdge: Vertex=" + vertexId +
+ ": already added an edge value for dest vertex id " +
+ targetVertexId);
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public long getSuperstep() {
+ return getGraphState().getSuperstep();
+ }
+
+ @Override
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ }
+
+ @Override
+ public final I getVertexId() {
+ return vertexId;
+ }
+
+ @Override
+ public final V getVertexValue() {
+ return vertexValue;
+ }
+
+ @Override
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ }
+
+ @Override
+ public E getEdgeValue(I targetVertexId) {
+ Edge<I, E> edge = destEdgeMap.get(targetVertexId);
+ return edge != null ? edge.getEdgeValue() : null;
+ }
+
+ @Override
+ public boolean hasEdge(I targetVertexId) {
+ return destEdgeMap.containsKey(targetVertexId);
+ }
+
+ /**
+ * Get an iterator to the edges on this vertex.
+ *
+ * @return A <em>sorted</em> iterator, as defined by the sort-order
+ * of the vertex ids
+ */
+ @Override
+ public Iterator<I> iterator() {
+ return destEdgeMap.keySet().iterator();
+ }
+
+ @Override
+ public int getNumOutEdges() {
+ return destEdgeMap.size();
+ }
+
+ @Override
+ public E removeEdge(I targetVertexId) {
+ Edge<I, E> edge = destEdgeMap.remove(targetVertexId);
+ if (edge != null) {
+ return edge.getEdgeValue();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (Edge<I, E> edge : destEdgeMap.values()) {
+ sendMsg(edge.getDestVertexId(), msg);
+ }
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ vertexId = BspUtils.<I>createVertexIndex(getConf());
+ vertexId.readFields(in);
+ boolean hasVertexValue = in.readBoolean();
+ if (hasVertexValue) {
+ vertexValue = BspUtils.<V>createVertexValue(getConf());
+ vertexValue.readFields(in);
+ }
+ long edgeMapSize = in.readLong();
+ for (long i = 0; i < edgeMapSize; ++i) {
+ Edge<I, E> edge = new Edge<I, E>();
+ edge.setConf(getConf());
+ edge.readFields(in);
+ addEdge(edge.getDestVertexId(), edge.getEdgeValue());
+ }
+ long msgListSize = in.readLong();
+ for (long i = 0; i < msgListSize; ++i) {
+ M msg = BspUtils.<M>createMessageValue(getConf());
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ }
+
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ out.writeLong(destEdgeMap.size());
+ for (Edge<I, E> edge : destEdgeMap.values()) {
+ edge.write(out);
+ }
+ out.writeLong(msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
+
+ @Override
+ void putMessages(Iterable<M> messages) {
+ msgList.clear();
+ for (M message : messages) {
+ msgList.add(message);
+ }
+ }
+
+ @Override
+ public Iterable<M> getMessages() {
+ return Iterables.unmodifiableIterable(msgList);
+ }
+
+ @Override
+ void releaseResources() {
+ // Hint to GC to free the messages
+ msgList.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+ ",#edges=" + destEdgeMap.size() + ")";
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java Thu Feb 16 22:12:31 2012
@@ -30,135 +30,138 @@ import java.util.Iterator;
import java.util.Map;
/**
- * Simple implementation of {@link BasicVertex} using an int as id, value and message.
- * Edges are immutable and unweighted. This class aims to be as memory efficient as possible.
+ * Simple implementation of {@link BasicVertex} using an int as id, value and
+ * message. Edges are immutable and unweighted. This class aims to be as
+ * memory efficient as possible.
*/
public abstract class IntIntNullIntVertex extends
- BasicVertex<IntWritable, IntWritable, NullWritable,IntWritable> {
-
- private int id;
- private int value;
-
- private int[] neighbors;
- private int[] messages;
-
- @Override
- public void initialize(IntWritable vertexId, IntWritable vertexValue,
- Map<IntWritable, NullWritable> edges,
- Iterable<IntWritable> messages) {
- id = vertexId.get();
- value = vertexValue.get();
- this.neighbors = new int[edges.size()];
- int n = 0;
- for (IntWritable neighbor : edges.keySet()) {
- this.neighbors[n++] = neighbor.get();
- }
- this.messages = new int[Iterables.size(messages)];
- n = 0;
- for (IntWritable message : messages) {
- this.messages[n++] = message.get();
- }
- }
-
- @Override
- public IntWritable getVertexId() {
- return new IntWritable(id);
- }
-
- @Override
- public IntWritable getVertexValue() {
- return new IntWritable(value);
- }
-
- @Override
- public void setVertexValue(IntWritable vertexValue) {
- value = vertexValue.get();
- }
-
- @Override
- public Iterator<IntWritable> iterator() {
- return new UnmodifiableIntArrayIterator(neighbors);
- }
-
- @Override
- public NullWritable getEdgeValue(IntWritable targetVertexId) {
- return NullWritable.get();
- }
-
- @Override
- public boolean hasEdge(IntWritable targetVertexId) {
- for (int neighbor : neighbors) {
- if (neighbor == targetVertexId.get()) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public int getNumOutEdges() {
- return neighbors.length;
- }
-
- @Override
- public void sendMsgToAllEdges(final IntWritable message) {
- for (int neighbor : neighbors) {
- sendMsg(new IntWritable(neighbor), message);
- }
- }
-
- @Override
- public Iterable<IntWritable> getMessages() {
- return new Iterable<IntWritable>() {
- @Override
- public Iterator<IntWritable> iterator() {
- return new UnmodifiableIntArrayIterator(messages);
- }
- };
- }
-
- @Override
- public void putMessages(Iterable<IntWritable> newMessages) {
- messages = new int[Iterables.size(newMessages)];
- int n = 0;
- for (IntWritable message : newMessages) {
- messages[n++] = message.get();
- }
- }
-
- @Override
- void releaseResources() {
- messages = new int[0];
- }
-
- @Override
- public void write(final DataOutput out) throws IOException {
- out.writeInt(id);
- out.writeInt(value);
- out.writeInt(neighbors.length);
- for (int n = 0; n < neighbors.length; n++) {
- out.writeInt(neighbors[n]);
- }
- out.writeInt(messages.length);
- for (int n = 0; n < messages.length; n++) {
- out.writeInt(messages[n]);
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- id = in.readInt();
- value = in.readInt();
- int numEdges = in.readInt();
- neighbors = new int[numEdges];
- for (int n = 0; n < numEdges; n++) {
- neighbors[n] = in.readInt();
- }
- int numMessages = in.readInt();
- messages = new int[numMessages];
- for (int n = 0; n < numMessages; n++) {
- messages[n] = in.readInt();
- }
+ BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable> {
+ /** Int represented vertex id */
+ private int id;
+ /** Int represented vertex value */
+ private int value;
+ /** Int array of neighbor vertex ids */
+ private int[] neighbors;
+ /** Int array of messages */
+ private int[] messages;
+
+ @Override
+ public void initialize(IntWritable vertexId, IntWritable vertexValue,
+ Map<IntWritable, NullWritable> edges,
+ Iterable<IntWritable> messages) {
+ id = vertexId.get();
+ value = vertexValue.get();
+ this.neighbors = new int[edges.size()];
+ int n = 0;
+ for (IntWritable neighbor : edges.keySet()) {
+ this.neighbors[n++] = neighbor.get();
+ }
+ this.messages = new int[Iterables.size(messages)];
+ n = 0;
+ for (IntWritable message : messages) {
+ this.messages[n++] = message.get();
+ }
+ }
+
+ @Override
+ public IntWritable getVertexId() {
+ return new IntWritable(id);
+ }
+
+ @Override
+ public IntWritable getVertexValue() {
+ return new IntWritable(value);
+ }
+
+ @Override
+ public void setVertexValue(IntWritable vertexValue) {
+ value = vertexValue.get();
+ }
+
+ @Override
+ public Iterator<IntWritable> iterator() {
+ return new UnmodifiableIntArrayIterator(neighbors);
+ }
+
+ @Override
+ public NullWritable getEdgeValue(IntWritable targetVertexId) {
+ return NullWritable.get();
+ }
+
+ @Override
+ public boolean hasEdge(IntWritable targetVertexId) {
+ for (int neighbor : neighbors) {
+ if (neighbor == targetVertexId.get()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int getNumOutEdges() {
+ return neighbors.length;
+ }
+
+ @Override
+ public void sendMsgToAllEdges(final IntWritable message) {
+ for (int neighbor : neighbors) {
+ sendMsg(new IntWritable(neighbor), message);
+ }
+ }
+
+ @Override
+ public Iterable<IntWritable> getMessages() {
+ return new Iterable<IntWritable>() {
+ @Override
+ public Iterator<IntWritable> iterator() {
+ return new UnmodifiableIntArrayIterator(messages);
+ }
+ };
+ }
+
+ @Override
+ public void putMessages(Iterable<IntWritable> newMessages) {
+ messages = new int[Iterables.size(newMessages)];
+ int n = 0;
+ for (IntWritable message : newMessages) {
+ messages[n++] = message.get();
+ }
+ }
+
+ @Override
+ void releaseResources() {
+ messages = new int[0];
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ out.writeInt(id);
+ out.writeInt(value);
+ out.writeInt(neighbors.length);
+ for (int n = 0; n < neighbors.length; n++) {
+ out.writeInt(neighbors[n]);
+ }
+ out.writeInt(messages.length);
+ for (int n = 0; n < messages.length; n++) {
+ out.writeInt(messages[n]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id = in.readInt();
+ value = in.readInt();
+ int numEdges = in.readInt();
+ neighbors = new int[numEdges];
+ for (int n = 0; n < numEdges; n++) {
+ neighbors[n] = in.readInt();
+ }
+ int numMessages = in.readInt();
+ messages = new int[numMessages];
+ for (int n = 0; n < numMessages; n++) {
+ messages[n] = in.readInt();
}
+ }
}