You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/06/27 04:02:20 UTC

git commit: updated refs/heads/trunk to df64dd7

Updated Branches:
  refs/heads/trunk 110d77c55 -> df64dd7b8


GIRAPH-698: Expose Computation to a user (aching)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/df64dd7b
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/df64dd7b
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/df64dd7b

Branch: refs/heads/trunk
Commit: df64dd7b8094864b3857d95309b1278c48be7322
Parents: 110d77c
Author: Avery Ching <ac...@fb.com>
Authored: Wed Jun 26 14:07:16 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Wed Jun 26 19:01:53 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                           |  2 ++
 .../apache/giraph/bsp/CentralizedServiceMaster.java |  8 ++++++++
 .../org/apache/giraph/counters/GiraphTimers.java    | 15 +++++++++------
 .../org/apache/giraph/master/BspServiceMaster.java  | 13 ++++++++++++-
 .../org/apache/giraph/master/MasterCompute.java     | 10 ++++++++++
 .../java/org/apache/giraph/master/MasterThread.java |  9 +++++++--
 .../org/apache/giraph/master/SuperstepClasses.java  | 12 +++++++++---
 .../giraph/master/TestComputationCombinerTypes.java | 16 ++++++++--------
 8 files changed, 65 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 5804cab..23a6b71 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-698: Expose Computation to a user (aching)
+
   GIRAPH-311:  Master halting in superstep 0 is ignored by workers (majakabiljo)
 
   GIRAPH-688: Make sure Giraph builds against all compatible YARN-enabled Hadoop versions,

http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index f41fc3d..999888d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.bsp;
 
 import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -123,6 +124,13 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
   MasterAggregatorHandler getAggregatorHandler();
 
   /**
+   * Get MasterCompute object
+   *
+   * @return MasterCompute object
+   */
+  MasterCompute getMasterCompute();
+
+  /**
    * Superstep has finished.
    */
   void postSuperstep();

http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
index 0d50c29..cbf2470 100644
--- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java
@@ -34,11 +34,11 @@ public class GiraphTimers extends HadoopCountersBase {
   /** Counter group name for the giraph timers */
   public static final String GROUP_NAME = "Giraph Timers";
   /** Counter name for setup msec */
-  public static final String SETUP_MS_NAME = "Setup (milliseconds)";
+  public static final String SETUP_MS_NAME = "Setup (ms)";
   /** Counter name for total msec */
-  public static final String TOTAL_MS_NAME = "Total (milliseconds)";
+  public static final String TOTAL_MS_NAME = "Total (ms)";
   /** Counter name for shutdown msec */
-  public static final String SHUTDOWN_MS_NAME = "Shutdown (milliseconds)";
+  public static final String SHUTDOWN_MS_NAME = "Shutdown (ms)";
 
   /** Singleton instance for everyone to use */
   private static GiraphTimers INSTANCE;
@@ -103,18 +103,21 @@ public class GiraphTimers extends HadoopCountersBase {
    * Get counter for superstep time in milliseconds
    *
    * @param superstep Integer superstep number.
+   * @param computationName Name of the computation for display (may be null)
    * @return Counter for setup time in milliseconds
    */
-  public GiraphHadoopCounter getSuperstepMs(long superstep) {
+  public GiraphHadoopCounter getSuperstepMs(long superstep,
+                                            String computationName) {
     GiraphHadoopCounter counter = superstepMsec.get(superstep);
     if (counter == null) {
       String counterPrefix;
       if (superstep == -1) {
         counterPrefix = "Input superstep";
       } else {
-        counterPrefix = "Superstep " + superstep;
+        counterPrefix = "Superstep " + superstep +
+            (computationName == null ? "" : " " + computationName);
       }
-      counter = getCounter(counterPrefix + " (milliseconds)");
+      counter = getCounter(counterPrefix + " (ms)");
       superstepMsec.put(superstep, counter);
     }
     return counter;

http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 3558887..1d3cff0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -717,6 +717,11 @@ public class BspServiceMaster<I extends WritableComparable,
     return aggregatorHandler;
   }
 
+  @Override
+  public MasterCompute getMasterCompute() {
+    return masterCompute;
+  }
+
   /**
    * Read the finalized checkpoint file and associated metadata files for the
    * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
@@ -1636,7 +1641,13 @@ public class BspServiceMaster<I extends WritableComparable,
       globalStats.setHaltComputation(true);
     }
 
-    superstepClasses.verifyTypesMatch(getConfiguration());
+    // Superstep 0 doesn't need to have matching types (Message types may not
+    // match) and if the computation is halted, no need to check any of
+    // the types.
+    if (!globalStats.getHaltComputation()) {
+      superstepClasses.verifyTypesMatch(
+          getConfiguration(), getSuperstep() != 0);
+    }
     getConfiguration().updateSuperstepClasses(superstepClasses);
 
     // Let everyone know the aggregated application state through the

http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index 310cb26..a3427dc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -134,6 +134,11 @@ public abstract class MasterCompute
    * @return Computation class
    */
   public final Class<? extends Computation> getComputation() {
+    // Might be called prior to classes being set, do not return NPE
+    if (superstepClasses == null) {
+      return null;
+    }
+
     return superstepClasses.getComputationClass();
   }
 
@@ -152,6 +157,11 @@ public abstract class MasterCompute
    * @return Combiner class
    */
   public final Class<? extends Combiner> getCombiner() {
+    // Might be called prior to classes being set, do not return NPE
+    if (superstepClasses == null) {
+      return null;
+    }
+
     return superstepClasses.getCombinerClass();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index e8eeeed..ec1733c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -23,6 +23,7 @@ import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.SuperstepState;
 import org.apache.giraph.counters.GiraphTimers;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -109,6 +110,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
             long startSuperstepMillis = System.currentTimeMillis();
             cachedSuperstep = bspServiceMaster.getSuperstep();
             GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
+            Class<? extends Computation> computationClass =
+                bspServiceMaster.getMasterCompute().getComputation();
             superstepState = bspServiceMaster.coordinateSuperstep();
             long superstepMillis = System.currentTimeMillis() -
                 startSuperstepMillis;
@@ -123,8 +126,10 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
                   bspServiceMaster.getSuperstep());
             }
             if (superstepCounterOn) {
-              GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep).
-                increment(superstepMillis);
+              String computationName = (computationClass == null) ?
+                  null : computationClass.getSimpleName();
+              GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
+                  computationName).increment(superstepMillis);
             }
 
             bspServiceMaster.postSuperstep();

http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index 13bb492..7a7df05 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -93,8 +93,11 @@ public class SuperstepClasses implements Writable {
    * don't match an {@link IllegalStateException} will be thrown.
    *
    * @param conf Configuration to verify this with
+   * @param checkMatchingMesssageTypes Check that the incoming/outgoing
+   *                                   message types match
    */
-  public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf) {
+  public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf,
+                               boolean checkMatchingMesssageTypes) {
     // In some cases, for example when using Jython, the Computation class may
     // not be set. This is because it is created by a ComputationFactory
     // dynamically and not known ahead of time. In this case there is nothing to
@@ -111,8 +114,11 @@ public class SuperstepClasses implements Writable {
         "Vertex value", computationClass);
     verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
         "Edge value", computationClass);
-    verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3],
-        "Previous outgoing and new incoming message", computationClass);
+
+    if (checkMatchingMesssageTypes) {
+      verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3],
+          "Previous outgoing and new incoming message", computationClass);
+    }
     Class<?> outgoingMessageType = computationTypes[4];
     if (outgoingMessageType.isInterface()) {
       throw new IllegalStateException("verifyTypesMatch: " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
index 8ae09bc..f9bd4e4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -39,7 +39,7 @@ public class TestComputationCombinerTypes {
   public void testAllMatchWithoutCombiner() {
     SuperstepClasses classes =
         new SuperstepClasses(IntNoOpComputation.class, null);
-    classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class));
+    classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class), true);
   }
 
   @Test
@@ -48,7 +48,7 @@ public class TestComputationCombinerTypes {
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
             IntDoubleCombiner.class);
     classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class));
+        createConfiguration(IntIntIntIntLongComputation.class), true);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -56,7 +56,7 @@ public class TestComputationCombinerTypes {
     SuperstepClasses classes =
         new SuperstepClasses(LongIntIntLongIntComputation.class, null);
     classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class));
+        createConfiguration(IntIntIntIntLongComputation.class), true);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -64,7 +64,7 @@ public class TestComputationCombinerTypes {
     SuperstepClasses classes =
         new SuperstepClasses(IntLongIntLongIntComputation.class, null);
     classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class));
+        createConfiguration(IntIntIntIntLongComputation.class), true);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -72,7 +72,7 @@ public class TestComputationCombinerTypes {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntLongLongIntComputation.class, null);
     classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class));
+        createConfiguration(IntIntIntIntLongComputation.class), true);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -80,7 +80,7 @@ public class TestComputationCombinerTypes {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntIntIntLongComputation.class, null);
     classes.verifyTypesMatch(
-        createConfiguration(IntIntIntLongDoubleComputation.class));
+        createConfiguration(IntIntIntLongDoubleComputation.class), true);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -89,7 +89,7 @@ public class TestComputationCombinerTypes {
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
             DoubleDoubleCombiner.class);
     classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class));
+        createConfiguration(IntIntIntIntLongComputation.class), true);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -98,7 +98,7 @@ public class TestComputationCombinerTypes {
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
             IntLongCombiner.class);
     classes.verifyTypesMatch(
-        createConfiguration(IntIntIntIntLongComputation.class));
+        createConfiguration(IntIntIntIntLongComputation.class), true);
   }
 
   private static ImmutableClassesGiraphConfiguration createConfiguration(