You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/06/28 21:36:32 UTC
svn commit: r1355128 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/
src/main/java/org/apache/giraph/examples/
src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/
Author: jghoman
Date: Thu Jun 28 19:36:29 2012
New Revision: 1355128
URL: http://svn.apache.org/viewvc?rev=1355128&view=rev
Log:
GIRAPH-127: Extending the API with a master.compute() function. Contributed by Jan van der Lugt.
Added:
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Jun 28 19:36:29 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-127: Extending the API with a master.compute() function.
+ (Jan van der Lugt via jghoman)
+
GIRAPH-220: Default implementation of BasicVertex#sendMsgToAllEdges().
(Alessandro Presta via jghoman)
Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Jun 28 19:36:29 2012
@@ -101,7 +101,6 @@ public class RandomMessageBenchmark impl
private long totalMessages = 0;
/** Total millis */
private long totalMillis = 0;
- /** Class logger */
@Override
public void preApplication()
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Thu Jun 28 19:36:29 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.bsp;
import java.io.IOException;
+import org.apache.giraph.graph.AggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.zookeeper.KeeperException;
@@ -36,7 +37,7 @@ import org.apache.zookeeper.KeeperExcept
@SuppressWarnings("rawtypes")
public interface CentralizedServiceMaster<
I extends WritableComparable, V extends Writable, E extends Writable,
- M extends Writable> extends CentralizedService<I, V, E, M> {
+ M extends Writable> extends CentralizedService<I, V, E, M>, AggregatorUsage {
/**
* Become the master.
* @return true if became the master, false if the application is done.
Added: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java?rev=1355128&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java Thu Jun 28 19:36:29 2012
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.aggregators.IntOverwriteAggregator;
+import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Demonstrates a computation with a centralized part implemented via a
+ * MasterCompute.
+ */
+public class SimpleMasterComputeVertex extends LongDoubleFloatDoubleVertex {
+ /** Aggregator to get values from the master to the workers */
+ public static final String SMC_AGG = "simplemastercompute.aggregator";
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleMasterComputeVertex.class);
+
+ @Override
+ public void compute(Iterator<DoubleWritable> msgIterator) {
+ DoubleOverwriteAggregator agg =
+ (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
+ double oldSum = getSuperstep() == 0 ? 0 : getVertexValue().get();
+ double newValue = agg.getAggregatedValue().get();
+ double newSum = oldSum + newValue;
+ setVertexValue(new DoubleWritable(newSum));
+ SimpleMasterComputeWorkerContext workerContext =
+ (SimpleMasterComputeWorkerContext) getWorkerContext();
+ workerContext.setFinalSum(newSum);
+ LOG.info("Current sum: " + newSum);
+ }
+
+ /**
+ * Worker context used with {@link SimpleMasterComputeVertex}.
+ */
+ public static class SimpleMasterComputeWorkerContext
+ extends WorkerContext {
+ /** Final sum value for verification for local jobs */
+ private static double FINAL_SUM;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ registerAggregator(SMC_AGG, IntOverwriteAggregator.class);
+ }
+
+ @Override
+ public void preSuperstep() {
+ useAggregator(SMC_AGG);
+ }
+
+ @Override
+ public void postSuperstep() {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+
+ public void setFinalSum(double sum) {
+ FINAL_SUM = sum;
+ }
+
+ public static double getFinalSum() {
+ return FINAL_SUM;
+ }
+ }
+
+ /**
+ * MasterCompute used with {@link SimpleMasterComputeVertex}.
+ */
+ public static class SimpleMasterCompute
+ extends MasterCompute {
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+
+ @Override
+ public void compute() {
+ DoubleOverwriteAggregator agg =
+ (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
+ agg.aggregate(((double) getSuperstep()) / 2 + 1);
+ if (getSuperstep() == 10) {
+ haltComputation();
+ }
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
+ }
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Jun 28 19:36:29 2012
@@ -63,7 +63,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.giraph.graph.partition.MasterGraphPartitioner;
@@ -131,6 +130,8 @@ public class BspServiceMaster<I extends
new ArrayList<PartitionStats>();
/** Aggregator writer */
private AggregatorWriter aggregatorWriter;
+ /** Master class */
+ private MasterCompute masterCompute;
/**
* Constructor for setting up the master.
@@ -429,7 +430,7 @@ public class BspServiceMaster<I extends
partitionSet.add(workerInfo.getPartitionId());
}
for (int i = 1; i <= maxWorkers; ++i) {
- if (partitionSet.contains(new Integer(i))) {
+ if (partitionSet.contains(Integer.valueOf(i))) {
continue;
} else if (i == getTaskPartition()) {
continue;
@@ -633,6 +634,7 @@ public class BspServiceMaster<I extends
getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
}
}
+ masterCompute.readFields(finalizedStream);
finalizedStream.close();
Map<Integer, PartitionOwner> idOwnerMap =
@@ -748,6 +750,8 @@ public class BspServiceMaster<I extends
currentMasterTaskPartitionCounter.increment(
getTaskPartition() -
currentMasterTaskPartitionCounter.getValue());
+ masterCompute =
+ BspUtils.createMasterCompute(getConfiguration());
aggregatorWriter =
BspUtils.createAggregatorWriter(getConfiguration());
try {
@@ -843,18 +847,11 @@ public class BspServiceMaster<I extends
}
/**
- * Get the aggregator values for a particular superstep,
- * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
+ * Get the aggregator values for a particular superstep and aggregate them.
*
* @param superstep superstep to check
*/
private void collectAndProcessAggregatorValues(long superstep) {
- if (superstep == INPUT_SUPERSTEP) {
- // Nothing to collect on the input superstep
- return;
- }
- Map<String, Aggregator<? extends Writable>> aggregatorMap =
- new TreeMap<String, Aggregator<? extends Writable>>();
String workerFinishedPath =
getWorkerFinishedPath(getApplicationAttempt(), superstep);
List<String> hostnameIdPathList = null;
@@ -913,23 +910,16 @@ public class BspServiceMaster<I extends
AGGREGATOR_CLASS_NAME_KEY);
@SuppressWarnings("unchecked")
Aggregator<Writable> aggregator =
- (Aggregator<Writable>) aggregatorMap.get(aggregatorName);
+ (Aggregator<Writable>) getAggregator(aggregatorName);
boolean firstTime = false;
if (aggregator == null) {
@SuppressWarnings("unchecked")
- Aggregator<Writable> aggregatorWritable =
- (Aggregator<Writable>) getAggregator(aggregatorName);
- aggregator = aggregatorWritable;
- if (aggregator == null) {
- @SuppressWarnings("unchecked")
- Class<? extends Aggregator<Writable>> aggregatorClass =
- (Class<? extends Aggregator<Writable>>)
- Class.forName(aggregatorClassName);
- aggregator = registerAggregator(
- aggregatorName,
- aggregatorClass);
- }
- aggregatorMap.put(aggregatorName, aggregator);
+ Class<? extends Aggregator<Writable>> aggregatorClass =
+ (Class<? extends Aggregator<Writable>>)
+ Class.forName(aggregatorClassName);
+ aggregator = registerAggregator(
+ aggregatorName,
+ aggregatorClass);
firstTime = true;
}
Writable aggregatorValue =
@@ -979,12 +969,21 @@ public class BspServiceMaster<I extends
}
}
}
+ }
+
+ /**
+ * Save the supplied aggregator values.
+ *
+ * @param superstep superstep for which to save values
+ */
+ private void saveAggregatorValues(long superstep) {
+ Map<String, Aggregator<Writable>> aggregatorMap = getAggregatorMap();
if (aggregatorMap.size() > 0) {
String mergedAggregatorPath =
getMergedAggregatorPath(getApplicationAttempt(), superstep);
byte [] zkData = null;
JSONArray aggregatorArray = new JSONArray();
- for (Map.Entry<String, Aggregator<? extends Writable>> entry :
+ for (Map.Entry<String, Aggregator<Writable>> entry :
aggregatorMap.entrySet()) {
try {
ByteArrayOutputStream outputStream =
@@ -1000,7 +999,7 @@ public class BspServiceMaster<I extends
Base64.encodeBytes(outputStream.toByteArray()));
aggregatorArray.put(aggregatorObj);
if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: " +
+ LOG.info("saveAggregatorValues: " +
"Trying to add aggregatorObj " +
aggregatorObj + "(" +
entry.getValue().getAggregatedValue() +
@@ -1009,11 +1008,11 @@ public class BspServiceMaster<I extends
}
} catch (IOException e) {
throw new IllegalStateException(
- "collectAndProcessAggregatorValues: " +
+ "saveAggregatorValues: " +
"IllegalStateException", e);
} catch (JSONException e) {
throw new IllegalStateException(
- "collectAndProcessAggregatorValues: JSONException", e);
+ "saveAggregatorValues: JSONException", e);
}
}
try {
@@ -1024,18 +1023,18 @@ public class BspServiceMaster<I extends
CreateMode.PERSISTENT,
true);
} catch (KeeperException.NodeExistsException e) {
- LOG.warn("collectAndProcessAggregatorValues: " +
+ LOG.warn("saveAggregatorValues: " +
mergedAggregatorPath + " already exists!");
} catch (KeeperException e) {
throw new IllegalStateException(
- "collectAndProcessAggregatorValues: KeeperException", e);
+ "saveAggregatorValues: KeeperException", e);
} catch (InterruptedException e) {
throw new IllegalStateException(
- "collectAndProcessAggregatorValues: IllegalStateException",
+ "saveAggregatorValues: IllegalStateException",
e);
}
if (LOG.isInfoEnabled()) {
- LOG.info("collectAndProcessAggregatorValues: Finished " +
+ LOG.info("saveAggregatorValues: Finished " +
"loading " +
mergedAggregatorPath + " with aggregator values " +
aggregatorArray);
@@ -1090,6 +1089,7 @@ public class BspServiceMaster<I extends
} else {
finalizedOutputStream.writeInt(0);
}
+ masterCompute.write(finalizedOutputStream);
finalizedOutputStream.close();
lastCheckpointedSuperstep = superstep;
lastCheckpointedSuperstepCounter.increment(superstep -
@@ -1221,10 +1221,10 @@ public class BspServiceMaster<I extends
getZkExt().deleteExt(inputSplitsPath, -1, true);
} catch (InterruptedException e) {
throw new RuntimeException(
- "retartFromCheckpoint: InterruptedException", e);
+ "restartFromCheckpoint: InterruptedException", e);
} catch (KeeperException e) {
throw new RuntimeException(
- "retartFromCheckpoint: KeeperException", e);
+ "restartFromCheckpoint: KeeperException", e);
}
setApplicationAttempt(getApplicationAttempt() + 1);
setCachedSuperstep(checkpoint);
@@ -1378,7 +1378,6 @@ public class BspServiceMaster<I extends
return true;
}
-
@Override
public SuperstepState coordinateSuperstep() throws
KeeperException, InterruptedException {
@@ -1389,6 +1388,7 @@ public class BspServiceMaster<I extends
// 4. Collect and process aggregators
// 5. Create superstep finished node
// 6. If the checkpoint frequency is met, finalize the checkpoint
+
List<WorkerInfo> chosenWorkerInfoList = checkWorkers();
if (chosenWorkerInfoList == null) {
LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
@@ -1449,8 +1449,21 @@ public class BspServiceMaster<I extends
return SuperstepState.WORKER_FAILURE;
}
+ // Collect aggregator values, then run the master.compute() and
+ // finally save the aggregator values
collectAndProcessAggregatorValues(getSuperstep());
+ runMasterCompute(getSuperstep());
+ saveAggregatorValues(getSuperstep());
+
+ // If the master is halted or all the vertices voted to halt and there
+ // are no more messages in the system, stop the computation
GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
+ if (masterCompute.isHalted() ||
+ (globalStats.getFinishedVertexCount() ==
+ globalStats.getVertexCount() &&
+ globalStats.getMessageCount() == 0)) {
+ globalStats.setHaltComputation(true);
+ }
// Let everyone know the aggregated application state through the
// superstep finishing znode.
@@ -1515,9 +1528,7 @@ public class BspServiceMaster<I extends
superstepCounter.increment(1);
}
SuperstepState superstepState;
- if ((globalStats.getFinishedVertexCount() ==
- globalStats.getVertexCount()) &&
- globalStats.getMessageCount() == 0) {
+ if (globalStats.getHaltComputation()) {
superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
} else {
superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
@@ -1536,6 +1547,37 @@ public class BspServiceMaster<I extends
}
/**
+ * Run the master.compute() class
+ *
+ * @param superstep superstep for which to run the master.compute()
+ */
+ private void runMasterCompute(long superstep) {
+ GraphState<I, V, E, M> graphState = getGraphMapper().getGraphState();
+ // The master.compute() should run logically before the workers, so
+ // increase the superstep counter it uses by one
+ graphState.setSuperstep(superstep + 1);
+ graphState.setNumVertices(vertexCounter.getValue());
+ graphState.setNumEdges(edgeCounter.getValue());
+ graphState.setContext(getContext());
+ graphState.setGraphMapper(getGraphMapper());
+ masterCompute.setGraphState(graphState);
+ if (superstep == INPUT_SUPERSTEP) {
+ try {
+ masterCompute.initialize();
+ } catch (InstantiationException e) {
+ LOG.fatal("map: MasterCompute.initialize failed in instantiation", e);
+ throw new RuntimeException(
+ "map: MasterCompute.initialize failed in instantiation", e);
+ } catch (IllegalAccessException e) {
+ LOG.fatal("map: MasterCompute.initialize failed in access", e);
+ throw new RuntimeException(
+ "map: MasterCompute.initialize failed in access", e);
+ }
+ }
+ masterCompute.compute();
+ }
+
+ /**
* Need to clean up ZooKeeper nicely. Make sure all the masters and workers
* have reported ending their ZooKeeper connections.
*/
@@ -1744,4 +1786,15 @@ public class BspServiceMaster<I extends
return foundEvent;
}
+
+ /**
+ * Use an aggregator in this superstep. Note that the master uses all
+ * aggregators by default, so calling this function is not neccessary.
+ *
+ * @param name Name of aggregator (should be unique)
+ * @return boolean (always true)
+ */
+ public boolean useAggregator(String name) {
+ return true;
+ }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Jun 28 19:36:29 2012
@@ -745,9 +745,6 @@ public class BspServiceWorker<I extends
* @param superstep Superstep to get the aggregated values from
*/
private void getAggregatorValues(long superstep) {
- if (superstep <= (INPUT_SUPERSTEP + 1)) {
- return;
- }
String mergedAggregatorPath =
getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
JSONArray aggregatorArray = null;
@@ -1061,9 +1058,7 @@ public class BspServiceWorker<I extends
getGraphMapper().getGraphState().
setNumEdges(globalStats.getEdgeCount()).
setNumVertices(globalStats.getVertexCount());
- return (globalStats.getFinishedVertexCount() ==
- globalStats.getVertexCount()) &&
- (globalStats.getMessageCount() == 0);
+ return globalStats.getHaltComputation();
}
/**
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Thu Jun 28 19:36:29 2012
@@ -325,6 +325,34 @@ public class BspUtils {
return workerContext;
}
+ /**
+ * Get the user's subclassed {@link MasterCompute}
+ *
+ * @param conf Configuration to check
+ * @return User's master class
+ */
+ public static Class<? extends MasterCompute>
+ getMasterComputeClass(Configuration conf) {
+ return (Class<? extends MasterCompute>)
+ conf.getClass(GiraphJob.MASTER_COMPUTE_CLASS,
+ DefaultMasterCompute.class,
+ MasterCompute.class);
+ }
+
+ /**
+ * Create a user master
+ *
+ * @param conf Configuration to check
+ * @return Instantiated user master
+ */
+ public static MasterCompute
+ createMasterCompute(Configuration conf) {
+ Class<? extends MasterCompute> masterComputeClass =
+ getMasterComputeClass(conf);
+ MasterCompute masterCompute =
+ ReflectionUtils.newInstance(masterComputeClass, conf);
+ return masterCompute;
+ }
/**
* Get the user's subclassed {@link BasicVertex}
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java?rev=1355128&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java Thu Jun 28 19:36:29 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A dumb implementation of {@link MasterCompute}. This is the default
+ * implementation when no MasterCompute is defined by the user. It does
+ * nothing.
+ */
+
+public class DefaultMasterCompute extends MasterCompute {
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void compute() {
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ }
+
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Jun 28 19:36:29 2012
@@ -44,6 +44,9 @@ public class GiraphJob {
public static final String VERTEX_INPUT_FORMAT_CLASS =
"giraph.vertexInputFormatClass";
+ /** Class for Master - optional */
+ public static final String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
+
/** VertexOutputFormat class - optional */
public static final String VERTEX_OUTPUT_FORMAT_CLASS =
"giraph.vertexOutputFormatClass";
@@ -464,6 +467,16 @@ public class GiraphJob {
}
/**
+ * Set the master class (optional)
+ *
+ * @param masterComputeClass Runs master computation
+ */
+ public final void setMasterComputeClass(Class<?> masterComputeClass) {
+ getConfiguration().setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
+ MasterCompute.class);
+ }
+
+ /**
* Set the vertex output format class (optional)
*
* @param vertexOutputFormatClass Determines how graph is output
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java Thu Jun 28 19:36:29 2012
@@ -37,6 +37,8 @@ public class GlobalStats implements Writ
private long edgeCount = 0;
/** All messages sent in the last superstep */
private long messageCount = 0;
+ /** Whether the computation should be halted */
+ private boolean haltComputation = false;
/**
* Add the stats of a partition to the global stats.
@@ -65,6 +67,14 @@ public class GlobalStats implements Writ
return messageCount;
}
+ public boolean getHaltComputation() {
+ return haltComputation;
+ }
+
+ public void setHaltComputation(boolean value) {
+ haltComputation = value;
+ }
+
/**
* Add messages to the global stats.
*
@@ -80,6 +90,7 @@ public class GlobalStats implements Writ
finishedVertexCount = input.readLong();
edgeCount = input.readLong();
messageCount = input.readLong();
+ haltComputation = input.readBoolean();
}
@Override
@@ -88,12 +99,13 @@ public class GlobalStats implements Writ
output.writeLong(finishedVertexCount);
output.writeLong(edgeCount);
output.writeLong(messageCount);
+ output.writeBoolean(haltComputation);
}
@Override
public String toString() {
return "(vtx=" + vertexCount + ",finVtx=" +
finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
- messageCount + ")";
+ messageCount + ",haltComputation=" + haltComputation + ")";
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Jun 28 19:36:29 2012
@@ -19,6 +19,8 @@
package org.apache.giraph.graph;
import com.google.common.collect.Iterables;
+
+import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionOwner;
@@ -62,6 +64,8 @@ public class GraphMapper<I extends Writa
private static final Logger LOG = Logger.getLogger(GraphMapper.class);
/** Coordination service worker */
private CentralizedServiceWorker<I, V, E, M> serviceWorker;
+ /** Coordination service master */
+ private CentralizedServiceMaster<I, V, E, M> serviceMaster;
/** Coordination service master thread */
private Thread masterThread = null;
/** The map should be run exactly once, or else there is a problem. */
@@ -111,7 +115,14 @@ public class GraphMapper<I extends Writa
* @return Aggregator usage interface
*/
public final AggregatorUsage getAggregatorUsage() {
- return serviceWorker;
+ AggregatorUsage result = null;
+ if (serviceWorker != null) {
+ result = serviceWorker;
+ }
+ if (serviceMaster != null) {
+ result = serviceMaster;
+ }
+ return result;
}
public final WorkerContext getWorkerContext() {
@@ -446,13 +457,12 @@ public class GraphMapper<I extends Writa
LOG.info("setup: Starting up BspServiceMaster " +
"(master thread)...");
}
- masterThread =
- new MasterThread<I, V, E, M>(
- new BspServiceMaster<I, V, E, M>(serverPortList,
- sessionMsecTimeout,
- context,
- this),
- context);
+ serviceMaster = new BspServiceMaster<I, V, E, M>(serverPortList,
+ sessionMsecTimeout,
+ context,
+ this);
+ masterThread = new MasterThread<I, V, E, M>(
+ (BspServiceMaster<I, V, E, M>) serviceMaster, context);
masterThread.start();
}
if ((mapFunctions == MapFunctions.WORKER_ONLY) ||
Added: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java?rev=1355128&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java Thu Jun 28 19:36:29 2012
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Interface for defining a master vertex that can perform centralized
+ * computation between supersteps. This class will be instantiated on the
+ * master node and will run every superstep before the workers do.
+ *
+ * Communication with the workers should be performed via aggregators. The
+ * values of the aggregators are broadcast to the workers before
+ * vertex.compute() is called and collected by the master before
+ * master.compute() is called. This means aggregator values used by the workers
+ * are consistent with aggregator values from the master from the same
+ * superstep and aggregator used by the master are consistent with aggregator
+ * values from the workers from the previous superstep. Note that the master
+ * has to register its own aggregators (it does not call {@link WorkerContext}
+ * functions), but it uses all aggregators by default, so useAggregator does
+ * not have to be called.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class MasterCompute implements AggregatorUsage, Writable,
+ Configurable {
+ /** If true, do not do anymore computation on this vertex. */
+ protected boolean halt = false;
+ /** Global graph state **/
+ private GraphState graphState;
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Must be defined by user to specify what the master has to do.
+ */
+ public abstract void compute();
+
+ /**
+ * Initialize the MasterCompute class, this is the place to register
+ * aggregators.
+ */
+ public abstract void initialize() throws InstantiationException,
+ IllegalAccessException;
+
+ /**
+ * Retrieves the current superstep.
+ *
+ * @return Current superstep
+ */
+ public long getSuperstep() {
+ return getGraphState().getSuperstep();
+ }
+
+ /**
+ * Get the total (all workers) number of vertices that
+ * existed in the previous superstep.
+ *
+ * @return Total number of vertices (-1 if first superstep)
+ */
+ public long getNumVertices() {
+ return getGraphState().getNumVertices();
+ }
+
+ /**
+ * Get the total (all workers) number of edges that
+ * existed in the previous superstep.
+ *
+ * @return Total number of edges (-1 if first superstep)
+ */
+ public long getNumEdges() {
+ return getGraphState().getNumEdges();
+ }
+
+ /**
+ * After this is called, the computation will stop, even if there are
+ * still messages in the system or vertices that have not voted to halt.
+ */
+ public void haltComputation() {
+ halt = true;
+ }
+
+ /**
+ * Has the master halted?
+ *
+ * @return True if halted, false otherwise.
+ */
+ public boolean isHalted() {
+ return halt;
+ }
+
+ /**
+ * Get the graph state for all workers.
+ *
+ * @return Graph state for all workers
+ */
+ GraphState getGraphState() {
+ return graphState;
+ }
+
+ /**
+ * Set the graph state for all workers
+ *
+ * @param graphState Graph state for all workers
+ */
+ void setGraphState(GraphState graphState) {
+ this.graphState = graphState;
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return getGraphState().getContext();
+ }
+
+ @Override
+ public final <A extends Writable> Aggregator<A> registerAggregator(
+ String name, Class<? extends Aggregator<A>> aggregatorClass)
+ throws InstantiationException, IllegalAccessException {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ registerAggregator(name, aggregatorClass);
+ }
+
+ @Override
+ public final Aggregator<? extends Writable> getAggregator(String name) {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ getAggregator(name);
+ }
+
+ @Override
+ public final boolean useAggregator(String name) {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ useAggregator(name);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java Thu Jun 28 19:36:29 2012
@@ -111,7 +111,7 @@ public class MasterThread<I extends Writ
superstepState = bspServiceMaster.coordinateSuperstep();
long superstepMillis = System.currentTimeMillis() -
startSuperstepMillis;
- superstepSecsMap.put(new Long(cachedSuperstep),
+ superstepSecsMap.put(Long.valueOf(cachedSuperstep),
superstepMillis / 1000.0d);
if (LOG.isInfoEnabled()) {
LOG.info("masterThread: Coordination of superstep " +
Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Thu Jun 28 19:36:29 2012
@@ -32,6 +32,7 @@ import org.apache.giraph.aggregators.Lon
import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.examples.SimpleCombinerVertex;
import org.apache.giraph.examples.SimpleFailVertex;
+import org.apache.giraph.examples.SimpleMasterComputeVertex;
import org.apache.giraph.examples.SimpleMsgVertex;
import org.apache.giraph.examples.SimplePageRankVertex;
import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
@@ -412,4 +413,28 @@ public class TestBspBasic extends BspCas
fs.delete(valuesFile, false);
}
}
+
+ /**
+ * Run a sample BSP job locally and test MasterCompute.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testBspMasterCompute()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ GiraphJob job = prepareJob(getCallingMethodName(),
+ SimpleMasterComputeVertex.class, SimplePageRankVertexInputFormat.class);
+ job.setWorkerContextClass(
+ SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.class);
+ job.setMasterComputeClass(SimpleMasterComputeVertex.SimpleMasterCompute.class);
+ assertTrue(job.run(true));
+ if (!runningInDistributedMode()) {
+ double finalSum =
+ SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.getFinalSum();
+ System.out.println("testBspMasterCompute: finalSum=" + finalSum);
+ assertEquals(32.5, finalSum);
+ }
+ }
}