You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by jg...@apache.org on 2012/06/28 21:36:32 UTC

svn commit: r1355128 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: jghoman
Date: Thu Jun 28 19:36:29 2012
New Revision: 1355128

URL: http://svn.apache.org/viewvc?rev=1355128&view=rev
Log:
GIRAPH-127: Extending the API with a master.compute() function.  Contributed by Jan van der Lugt.

Added:
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Jun 28 19:36:29 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-127: Extending the API with a master.compute() function.
+  (Jan van der Lugt via jghoman)
+
   GIRAPH-220: Default implementation of BasicVertex#sendMsgToAllEdges().
   (Alessandro Presta via jghoman)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Thu Jun 28 19:36:29 2012
@@ -101,7 +101,6 @@ public class RandomMessageBenchmark impl
     private long totalMessages = 0;
     /** Total millis */
     private long totalMillis = 0;
-    /** Class logger */
 
     @Override
     public void preApplication()

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java Thu Jun 28 19:36:29 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.bsp;
 
 import java.io.IOException;
 
+import org.apache.giraph.graph.AggregatorUsage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.zookeeper.KeeperException;
@@ -36,7 +37,7 @@ import org.apache.zookeeper.KeeperExcept
 @SuppressWarnings("rawtypes")
 public interface CentralizedServiceMaster<
   I extends WritableComparable, V extends Writable, E extends Writable,
-  M extends Writable> extends CentralizedService<I, V, E, M> {
+  M extends Writable> extends CentralizedService<I, V, E, M>, AggregatorUsage {
   /**
    * Become the master.
    * @return true if became the master, false if the application is done.

Added: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java?rev=1355128&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java Thu Jun 28 19:36:29 2012
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.aggregators.IntOverwriteAggregator;
+import org.apache.giraph.graph.LongDoubleFloatDoubleVertex;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Demonstrates a computation with a centralized part implemented via a
+ * MasterCompute.
+ */
+public class SimpleMasterComputeVertex extends LongDoubleFloatDoubleVertex {
+  /** Aggregator to get values from the master to the workers */
+  public static final String SMC_AGG = "simplemastercompute.aggregator";
+  /** Logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleMasterComputeVertex.class);
+
+  @Override
+  public void compute(Iterator<DoubleWritable> msgIterator) {
+    DoubleOverwriteAggregator agg =
+        (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
+    double oldSum = getSuperstep() == 0 ? 0 : getVertexValue().get();
+    double newValue = agg.getAggregatedValue().get();
+    double newSum = oldSum + newValue;
+    setVertexValue(new DoubleWritable(newSum));
+    SimpleMasterComputeWorkerContext workerContext =
+        (SimpleMasterComputeWorkerContext) getWorkerContext();
+    workerContext.setFinalSum(newSum);
+    LOG.info("Current sum: " + newSum);
+  }
+
+  /**
+   * Worker context used with {@link SimpleMasterComputeVertex}.
+   */
+  public static class SimpleMasterComputeWorkerContext
+      extends WorkerContext {
+    /** Final sum value for verification for local jobs */
+    private static double FINAL_SUM;
+
+    @Override
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+      registerAggregator(SMC_AGG, IntOverwriteAggregator.class);
+    }
+
+    @Override
+    public void preSuperstep() {
+      useAggregator(SMC_AGG);
+    }
+
+    @Override
+    public void postSuperstep() {
+    }
+
+    @Override
+    public void postApplication() {
+    }
+
+    public void setFinalSum(double sum) {
+      FINAL_SUM = sum;
+    }
+
+    public static double getFinalSum() {
+      return FINAL_SUM;
+    }
+  }
+
+  /**
+   * MasterCompute used with {@link SimpleMasterComputeVertex}.
+   */
+  public static class SimpleMasterCompute
+      extends MasterCompute {
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    @Override
+    public void compute() {
+      DoubleOverwriteAggregator agg =
+          (DoubleOverwriteAggregator) getAggregator(SMC_AGG);
+      agg.aggregate(((double) getSuperstep()) / 2 + 1);
+      if (getSuperstep() == 10) {
+        haltComputation();
+      }
+    }
+
+    @Override
+    public void initialize() throws InstantiationException,
+        IllegalAccessException {
+      registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Jun 28 19:36:29 2012
@@ -63,7 +63,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.giraph.graph.partition.MasterGraphPartitioner;
@@ -131,6 +130,8 @@ public class BspServiceMaster<I extends 
       new ArrayList<PartitionStats>();
   /** Aggregator writer */
   private AggregatorWriter aggregatorWriter;
+  /** Master class */
+  private MasterCompute masterCompute;
 
   /**
    * Constructor for setting up the master.
@@ -429,7 +430,7 @@ public class BspServiceMaster<I extends 
             partitionSet.add(workerInfo.getPartitionId());
           }
           for (int i = 1; i <= maxWorkers; ++i) {
-            if (partitionSet.contains(new Integer(i))) {
+            if (partitionSet.contains(Integer.valueOf(i))) {
               continue;
             } else if (i == getTaskPartition()) {
               continue;
@@ -633,6 +634,7 @@ public class BspServiceMaster<I extends 
         getZkExt().setData(mergedAggregatorPath, aggregatorZkData, -1);
       }
     }
+    masterCompute.readFields(finalizedStream);
     finalizedStream.close();
 
     Map<Integer, PartitionOwner> idOwnerMap =
@@ -748,6 +750,8 @@ public class BspServiceMaster<I extends 
           currentMasterTaskPartitionCounter.increment(
               getTaskPartition() -
               currentMasterTaskPartitionCounter.getValue());
+          masterCompute =
+              BspUtils.createMasterCompute(getConfiguration());
           aggregatorWriter =
               BspUtils.createAggregatorWriter(getConfiguration());
           try {
@@ -843,18 +847,11 @@ public class BspServiceMaster<I extends 
   }
 
   /**
-   * Get the aggregator values for a particular superstep,
-   * aggregate and save them. Does nothing on the INPUT_SUPERSTEP.
+   * Get the aggregator values for a particular superstep and aggregate them.
    *
    * @param superstep superstep to check
    */
   private void collectAndProcessAggregatorValues(long superstep) {
-    if (superstep == INPUT_SUPERSTEP) {
-      // Nothing to collect on the input superstep
-      return;
-    }
-    Map<String, Aggregator<? extends Writable>> aggregatorMap =
-        new TreeMap<String, Aggregator<? extends Writable>>();
     String workerFinishedPath =
         getWorkerFinishedPath(getApplicationAttempt(), superstep);
     List<String> hostnameIdPathList = null;
@@ -913,23 +910,16 @@ public class BspServiceMaster<I extends 
                   AGGREGATOR_CLASS_NAME_KEY);
           @SuppressWarnings("unchecked")
           Aggregator<Writable> aggregator =
-            (Aggregator<Writable>) aggregatorMap.get(aggregatorName);
+            (Aggregator<Writable>) getAggregator(aggregatorName);
           boolean firstTime = false;
           if (aggregator == null) {
             @SuppressWarnings("unchecked")
-            Aggregator<Writable> aggregatorWritable =
-              (Aggregator<Writable>) getAggregator(aggregatorName);
-            aggregator = aggregatorWritable;
-            if (aggregator == null) {
-              @SuppressWarnings("unchecked")
-              Class<? extends Aggregator<Writable>> aggregatorClass =
-                (Class<? extends Aggregator<Writable>>)
-                Class.forName(aggregatorClassName);
-              aggregator = registerAggregator(
-                  aggregatorName,
-                  aggregatorClass);
-            }
-            aggregatorMap.put(aggregatorName, aggregator);
+            Class<? extends Aggregator<Writable>> aggregatorClass =
+              (Class<? extends Aggregator<Writable>>)
+              Class.forName(aggregatorClassName);
+            aggregator = registerAggregator(
+                aggregatorName,
+                aggregatorClass);
             firstTime = true;
           }
           Writable aggregatorValue =
@@ -979,12 +969,21 @@ public class BspServiceMaster<I extends 
         }
       }
     }
+  }
+
+  /**
+   * Save the supplied aggregator values.
+   *
+   * @param superstep superstep for which to save values
+   */
+  private void saveAggregatorValues(long superstep) {
+    Map<String, Aggregator<Writable>> aggregatorMap = getAggregatorMap();
     if (aggregatorMap.size() > 0) {
       String mergedAggregatorPath =
           getMergedAggregatorPath(getApplicationAttempt(), superstep);
       byte [] zkData = null;
       JSONArray aggregatorArray = new JSONArray();
-      for (Map.Entry<String, Aggregator<? extends Writable>> entry :
+      for (Map.Entry<String, Aggregator<Writable>> entry :
         aggregatorMap.entrySet()) {
         try {
           ByteArrayOutputStream outputStream =
@@ -1000,7 +999,7 @@ public class BspServiceMaster<I extends 
               Base64.encodeBytes(outputStream.toByteArray()));
           aggregatorArray.put(aggregatorObj);
           if (LOG.isInfoEnabled()) {
-            LOG.info("collectAndProcessAggregatorValues: " +
+            LOG.info("saveAggregatorValues: " +
                 "Trying to add aggregatorObj " +
                 aggregatorObj + "(" +
                     entry.getValue().getAggregatedValue() +
@@ -1009,11 +1008,11 @@ public class BspServiceMaster<I extends 
           }
         } catch (IOException e) {
           throw new IllegalStateException(
-              "collectAndProcessAggregatorValues: " +
+              "saveAggregatorValues: " +
                   "IllegalStateException", e);
         } catch (JSONException e) {
           throw new IllegalStateException(
-              "collectAndProcessAggregatorValues: JSONException", e);
+              "saveAggregatorValues: JSONException", e);
         }
       }
       try {
@@ -1024,18 +1023,18 @@ public class BspServiceMaster<I extends 
             CreateMode.PERSISTENT,
             true);
       } catch (KeeperException.NodeExistsException e) {
-        LOG.warn("collectAndProcessAggregatorValues: " +
+        LOG.warn("saveAggregatorValues: " +
             mergedAggregatorPath + " already exists!");
       } catch (KeeperException e) {
         throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: KeeperException", e);
+            "saveAggregatorValues: KeeperException", e);
       } catch (InterruptedException e) {
         throw new IllegalStateException(
-            "collectAndProcessAggregatorValues: IllegalStateException",
+            "saveAggregatorValues: IllegalStateException",
             e);
       }
       if (LOG.isInfoEnabled()) {
-        LOG.info("collectAndProcessAggregatorValues: Finished " +
+        LOG.info("saveAggregatorValues: Finished " +
             "loading " +
             mergedAggregatorPath + " with aggregator values " +
             aggregatorArray);
@@ -1090,6 +1089,7 @@ public class BspServiceMaster<I extends 
     } else {
       finalizedOutputStream.writeInt(0);
     }
+    masterCompute.write(finalizedOutputStream);
     finalizedOutputStream.close();
     lastCheckpointedSuperstep = superstep;
     lastCheckpointedSuperstepCounter.increment(superstep -
@@ -1221,10 +1221,10 @@ public class BspServiceMaster<I extends 
       getZkExt().deleteExt(inputSplitsPath, -1, true);
     } catch (InterruptedException e) {
       throw new RuntimeException(
-          "retartFromCheckpoint: InterruptedException", e);
+          "restartFromCheckpoint: InterruptedException", e);
     } catch (KeeperException e) {
       throw new RuntimeException(
-          "retartFromCheckpoint: KeeperException", e);
+          "restartFromCheckpoint: KeeperException", e);
     }
     setApplicationAttempt(getApplicationAttempt() + 1);
     setCachedSuperstep(checkpoint);
@@ -1378,7 +1378,6 @@ public class BspServiceMaster<I extends 
     return true;
   }
 
-
   @Override
   public SuperstepState coordinateSuperstep() throws
   KeeperException, InterruptedException {
@@ -1389,6 +1388,7 @@ public class BspServiceMaster<I extends 
     // 4. Collect and process aggregators
     // 5. Create superstep finished node
     // 6. If the checkpoint frequency is met, finalize the checkpoint
+
     List<WorkerInfo> chosenWorkerInfoList = checkWorkers();
     if (chosenWorkerInfoList == null) {
       LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
@@ -1449,8 +1449,21 @@ public class BspServiceMaster<I extends 
       return SuperstepState.WORKER_FAILURE;
     }
 
+    // Collect aggregator values, then run the master.compute() and
+    // finally save the aggregator values
     collectAndProcessAggregatorValues(getSuperstep());
+    runMasterCompute(getSuperstep());
+    saveAggregatorValues(getSuperstep());
+
+    // If the master is halted or all the vertices voted to halt and there
+    // are no more messages in the system, stop the computation
     GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
+    if (masterCompute.isHalted() ||
+        (globalStats.getFinishedVertexCount() ==
+        globalStats.getVertexCount() &&
+        globalStats.getMessageCount() == 0)) {
+      globalStats.setHaltComputation(true);
+    }
 
     // Let everyone know the aggregated application state through the
     // superstep finishing znode.
@@ -1515,9 +1528,7 @@ public class BspServiceMaster<I extends 
       superstepCounter.increment(1);
     }
     SuperstepState superstepState;
-    if ((globalStats.getFinishedVertexCount() ==
-        globalStats.getVertexCount()) &&
-        globalStats.getMessageCount() == 0) {
+    if (globalStats.getHaltComputation()) {
       superstepState = SuperstepState.ALL_SUPERSTEPS_DONE;
     } else {
       superstepState = SuperstepState.THIS_SUPERSTEP_DONE;
@@ -1536,6 +1547,37 @@ public class BspServiceMaster<I extends 
   }
 
   /**
+   * Run the master.compute() class
+   *
+   * @param superstep superstep for which to run the master.compute()
+   */
+  private void runMasterCompute(long superstep) {
+    GraphState<I, V, E, M> graphState = getGraphMapper().getGraphState();
+    // The master.compute() should run logically before the workers, so
+    // increase the superstep counter it uses by one
+    graphState.setSuperstep(superstep + 1);
+    graphState.setNumVertices(vertexCounter.getValue());
+    graphState.setNumEdges(edgeCounter.getValue());
+    graphState.setContext(getContext());
+    graphState.setGraphMapper(getGraphMapper());
+    masterCompute.setGraphState(graphState);
+    if (superstep == INPUT_SUPERSTEP) {
+      try {
+        masterCompute.initialize();
+      } catch (InstantiationException e) {
+        LOG.fatal("map: MasterCompute.initialize failed in instantiation", e);
+        throw new RuntimeException(
+            "map: MasterCompute.initialize failed in instantiation", e);
+      } catch (IllegalAccessException e) {
+        LOG.fatal("map: MasterCompute.initialize failed in access", e);
+        throw new RuntimeException(
+            "map: MasterCompute.initialize failed in access", e);
+      }
+    }
+    masterCompute.compute();
+  }
+
+  /**
    * Need to clean up ZooKeeper nicely.  Make sure all the masters and workers
    * have reported ending their ZooKeeper connections.
    */
@@ -1744,4 +1786,15 @@ public class BspServiceMaster<I extends 
 
     return foundEvent;
   }
+
+  /**
+   * Use an aggregator in this superstep. Note that the master uses all
+   * aggregators by default, so calling this function is not neccessary.
+   *
+   * @param name Name of aggregator (should be unique)
+   * @return boolean (always true)
+   */
+  public boolean useAggregator(String name) {
+    return true;
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Jun 28 19:36:29 2012
@@ -745,9 +745,6 @@ public class BspServiceWorker<I extends 
    * @param superstep Superstep to get the aggregated values from
    */
   private void getAggregatorValues(long superstep) {
-    if (superstep <= (INPUT_SUPERSTEP + 1)) {
-      return;
-    }
     String mergedAggregatorPath =
         getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
     JSONArray aggregatorArray = null;
@@ -1061,9 +1058,7 @@ public class BspServiceWorker<I extends 
     getGraphMapper().getGraphState().
     setNumEdges(globalStats.getEdgeCount()).
     setNumVertices(globalStats.getVertexCount());
-    return (globalStats.getFinishedVertexCount() ==
-        globalStats.getVertexCount()) &&
-        (globalStats.getMessageCount() == 0);
+    return globalStats.getHaltComputation();
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Thu Jun 28 19:36:29 2012
@@ -325,6 +325,34 @@ public class BspUtils {
     return workerContext;
   }
 
+  /**
+   * Get the user's subclassed {@link MasterCompute}
+   *
+   * @param conf Configuration to check
+   * @return User's master class
+   */
+  public static Class<? extends MasterCompute>
+  getMasterComputeClass(Configuration conf) {
+    return (Class<? extends MasterCompute>)
+      conf.getClass(GiraphJob.MASTER_COMPUTE_CLASS,
+        DefaultMasterCompute.class,
+        MasterCompute.class);
+  }
+
+  /**
+   * Create a user master
+   *
+   * @param conf Configuration to check
+   * @return Instantiated user master
+   */
+  public static MasterCompute
+  createMasterCompute(Configuration conf) {
+    Class<? extends MasterCompute> masterComputeClass =
+        getMasterComputeClass(conf);
+    MasterCompute masterCompute =
+        ReflectionUtils.newInstance(masterComputeClass, conf);
+    return masterCompute;
+  }
 
   /**
    * Get the user's subclassed {@link BasicVertex}

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java?rev=1355128&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java Thu Jun 28 19:36:29 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A dumb implementation of {@link MasterCompute}. This is the default
+ * implementation when no MasterCompute is defined by the user. It does
+ * nothing.
+ */
+
+public class DefaultMasterCompute extends MasterCompute {
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+  }
+
+  @Override
+  public void compute() {
+  }
+
+  @Override
+  public void initialize() throws InstantiationException,
+      IllegalAccessException {
+  }
+
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Thu Jun 28 19:36:29 2012
@@ -44,6 +44,9 @@ public class GiraphJob {
   public static final String VERTEX_INPUT_FORMAT_CLASS =
       "giraph.vertexInputFormatClass";
 
+  /** Class for Master - optional */
+  public static final String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
+
   /** VertexOutputFormat class - optional */
   public static final String VERTEX_OUTPUT_FORMAT_CLASS =
       "giraph.vertexOutputFormatClass";
@@ -464,6 +467,16 @@ public class GiraphJob {
   }
 
   /**
+   * Set the master class (optional)
+   *
+   * @param masterComputeClass Runs master computation
+   */
+  public final void setMasterComputeClass(Class<?> masterComputeClass) {
+    getConfiguration().setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
+        MasterCompute.class);
+  }
+
+  /**
    * Set the vertex output format class (optional)
    *
    * @param vertexOutputFormatClass Determines how graph is output

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GlobalStats.java Thu Jun 28 19:36:29 2012
@@ -37,6 +37,8 @@ public class GlobalStats implements Writ
   private long edgeCount = 0;
   /** All messages sent in the last superstep */
   private long messageCount = 0;
+  /** Whether the computation should be halted */
+  private boolean haltComputation = false;
 
   /**
    * Add the stats of a partition to the global stats.
@@ -65,6 +67,14 @@ public class GlobalStats implements Writ
     return messageCount;
   }
 
+  public boolean getHaltComputation() {
+    return haltComputation;
+  }
+
+  public void setHaltComputation(boolean value) {
+    haltComputation = value;
+  }
+
   /**
    * Add messages to the global stats.
    *
@@ -80,6 +90,7 @@ public class GlobalStats implements Writ
     finishedVertexCount = input.readLong();
     edgeCount = input.readLong();
     messageCount = input.readLong();
+    haltComputation = input.readBoolean();
   }
 
   @Override
@@ -88,12 +99,13 @@ public class GlobalStats implements Writ
     output.writeLong(finishedVertexCount);
     output.writeLong(edgeCount);
     output.writeLong(messageCount);
+    output.writeBoolean(haltComputation);
   }
 
   @Override
   public String toString() {
     return "(vtx=" + vertexCount + ",finVtx=" +
         finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" +
-        messageCount + ")";
+        messageCount + ",haltComputation=" + haltComputation + ")";
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Jun 28 19:36:29 2012
@@ -19,6 +19,8 @@
 package org.apache.giraph.graph;
 
 import com.google.common.collect.Iterables;
+
+import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
@@ -62,6 +64,8 @@ public class GraphMapper<I extends Writa
   private static final Logger LOG = Logger.getLogger(GraphMapper.class);
   /** Coordination service worker */
   private CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  /** Coordination service master */
+  private CentralizedServiceMaster<I, V, E, M> serviceMaster;
   /** Coordination service master thread */
   private Thread masterThread = null;
   /** The map should be run exactly once, or else there is a problem. */
@@ -111,7 +115,14 @@ public class GraphMapper<I extends Writa
    * @return Aggregator usage interface
    */
   public final AggregatorUsage getAggregatorUsage() {
-    return serviceWorker;
+    AggregatorUsage result = null;
+    if (serviceWorker != null) {
+      result = serviceWorker;
+    }
+    if (serviceMaster != null) {
+      result = serviceMaster;
+    }
+    return result;
   }
 
   public final WorkerContext getWorkerContext() {
@@ -446,13 +457,12 @@ public class GraphMapper<I extends Writa
           LOG.info("setup: Starting up BspServiceMaster " +
               "(master thread)...");
         }
-        masterThread =
-            new MasterThread<I, V, E, M>(
-                new BspServiceMaster<I, V, E, M>(serverPortList,
-                    sessionMsecTimeout,
-                    context,
-                    this),
-                    context);
+        serviceMaster = new BspServiceMaster<I, V, E, M>(serverPortList,
+                sessionMsecTimeout,
+                context,
+                this);
+        masterThread = new MasterThread<I, V, E, M>(
+                (BspServiceMaster<I, V, E, M>) serviceMaster, context);
         masterThread.start();
       }
       if ((mapFunctions == MapFunctions.WORKER_ONLY) ||

Added: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java?rev=1355128&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterCompute.java Thu Jun 28 19:36:29 2012
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Interface for defining a master vertex that can perform centralized
+ * computation between supersteps. This class will be instantiated on the
+ * master node and will run every superstep before the workers do.
+ *
+ * Communication with the workers should be performed via aggregators. The
+ * values of the aggregators are broadcast to the workers before
+ * vertex.compute() is called and collected by the master before
+ * master.compute() is called. This means aggregator values used by the workers
+ * are consistent with aggregator values from the master from the same
+ * superstep and aggregator used by the master are consistent with aggregator
+ * values from the workers from the previous superstep. Note that the master
+ * has to register its own aggregators (it does not call {@link WorkerContext}
+ * functions), but it uses all aggregators by default, so useAggregator does
+ * not have to be called.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class MasterCompute implements AggregatorUsage, Writable,
+    Configurable {
+  /** If true, do not do anymore computation on this vertex. */
+  protected boolean halt = false;
+  /** Global graph state **/
+  private GraphState graphState;
+  /** Configuration */
+  private Configuration conf;
+
+  /**
+   * Must be defined by user to specify what the master has to do.
+   */
+  public abstract void compute();
+
+  /**
+   * Initialize the MasterCompute class, this is the place to register
+   * aggregators.
+   */
+  public abstract void initialize() throws InstantiationException,
+    IllegalAccessException;
+
+  /**
+   * Retrieves the current superstep.
+   *
+   * @return Current superstep
+   */
+  public long getSuperstep() {
+    return getGraphState().getSuperstep();
+  }
+
+  /**
+   * Get the total (all workers) number of vertices that
+   * existed in the previous superstep.
+   *
+   * @return Total number of vertices (-1 if first superstep)
+   */
+  public long getNumVertices() {
+    return getGraphState().getNumVertices();
+  }
+
+  /**
+   * Get the total (all workers) number of edges that
+   * existed in the previous superstep.
+   *
+   * @return Total number of edges (-1 if first superstep)
+   */
+  public long getNumEdges() {
+    return getGraphState().getNumEdges();
+  }
+
+  /**
+   * After this is called, the computation will stop, even if there are
+   * still messages in the system or vertices that have not voted to halt.
+   */
+  public void haltComputation() {
+    halt = true;
+  }
+
+  /**
+   * Has the master halted?
+   *
+   * @return True if halted, false otherwise.
+   */
+  public boolean isHalted() {
+    return halt;
+  }
+
+  /**
+   * Get the graph state for all workers.
+   *
+   * @return Graph state for all workers
+   */
+  GraphState getGraphState() {
+    return graphState;
+  }
+
+  /**
+   * Set the graph state for all workers
+   *
+   * @param graphState Graph state for all workers
+   */
+  void setGraphState(GraphState graphState) {
+    this.graphState = graphState;
+  }
+
+  /**
+   * Get the mapper context
+   *
+   * @return Mapper context
+   */
+  public Mapper.Context getContext() {
+    return getGraphState().getContext();
+  }
+
+  @Override
+  public final <A extends Writable> Aggregator<A> registerAggregator(
+    String name, Class<? extends Aggregator<A>> aggregatorClass)
+    throws InstantiationException, IllegalAccessException {
+    return getGraphState().getGraphMapper().getAggregatorUsage().
+        registerAggregator(name, aggregatorClass);
+  }
+
+  @Override
+  public final Aggregator<? extends Writable> getAggregator(String name) {
+    return getGraphState().getGraphMapper().getAggregatorUsage().
+      getAggregator(name);
+  }
+
+  @Override
+  public final boolean useAggregator(String name) {
+    return getGraphState().getGraphMapper().getAggregatorUsage().
+      useAggregator(name);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java Thu Jun 28 19:36:29 2012
@@ -111,7 +111,7 @@ public class MasterThread<I extends Writ
             superstepState = bspServiceMaster.coordinateSuperstep();
             long superstepMillis = System.currentTimeMillis() -
                 startSuperstepMillis;
-            superstepSecsMap.put(new Long(cachedSuperstep),
+            superstepSecsMap.put(Long.valueOf(cachedSuperstep),
                 superstepMillis / 1000.0d);
             if (LOG.isInfoEnabled()) {
               LOG.info("masterThread: Coordination of superstep " +

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1355128&r1=1355127&r2=1355128&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Thu Jun 28 19:36:29 2012
@@ -32,6 +32,7 @@ import org.apache.giraph.aggregators.Lon
 import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimpleCombinerVertex;
 import org.apache.giraph.examples.SimpleFailVertex;
+import org.apache.giraph.examples.SimpleMasterComputeVertex;
 import org.apache.giraph.examples.SimpleMsgVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
 import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
@@ -412,4 +413,28 @@ public class TestBspBasic extends BspCas
       fs.delete(valuesFile, false);
     }
   }
+  
+  /**
+   * Run a sample BSP job locally and test MasterCompute.
+   *
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testBspMasterCompute()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphJob job = prepareJob(getCallingMethodName(),
+        SimpleMasterComputeVertex.class, SimplePageRankVertexInputFormat.class);
+    job.setWorkerContextClass(
+        SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.class);
+    job.setMasterComputeClass(SimpleMasterComputeVertex.SimpleMasterCompute.class);
+    assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      double finalSum =
+          SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.getFinalSum();
+      System.out.println("testBspMasterCompute: finalSum=" + finalSum);
+      assertEquals(32.5, finalSum);
+    }
+  }
 }