You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/03/25 23:47:11 UTC

git commit: GIRAPH-582: Create a generic option for determining the number of supersteps that a job runs for (aching)

Updated Branches:
  refs/heads/trunk 95ce243fd -> 56fcb519a


GIRAPH-582: Create a generic option for determining the number of
supersteps that a job runs for (aching)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/56fcb519
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/56fcb519
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/56fcb519

Branch: refs/heads/trunk
Commit: 56fcb519aa56550bb5fdfa666eba275e0f1b0020
Parents: 95ce243
Author: Avery Ching <ac...@fb.com>
Authored: Mon Mar 25 12:13:14 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Mon Mar 25 15:46:34 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    3 +
 .../apache/giraph/conf/GiraphConfiguration.java    |   20 ++++
 .../org/apache/giraph/conf/GiraphConstants.java    |    9 ++
 .../org/apache/giraph/master/BspServiceMaster.java |   19 +++-
 .../java/org/apache/giraph/TestMaxSuperstep.java   |   83 +++++++++++++++
 5 files changed, 132 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 84939a4..b695ae0 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-582: Create a generic option for determining the number of
+  supersteps that a job runs for (aching)
+
   GIRAPH-586: Customizable default vertex value (apresta)
 
   GIRAPH-580: NPE in HiveGiraphRunner when the vertex output format is

http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ae276f6..3b84831 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -751,4 +751,24 @@ public class GiraphConfiguration extends Configuration
         get(GiraphConstants.DNS_INTERFACE, "default"),
         get(GiraphConstants.DNS_NAMESERVER, "default"));
   }
+
+  /**
+   * Set the maximum number of supersteps of this application.  After this
+   * many supersteps are executed, the application will shutdown.
+   *
+   * @param maxNumberOfSupersteps Maximum number of supersteps
+   */
+  public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) {
+    setInt(MAX_NUMBER_OF_SUPERSTEPS, maxNumberOfSupersteps);
+  }
+
+  /**
+   * Get the maximum number of supersteps of this application.  After this
+   * many supersteps are executed, the application will shutdown.
+   *
+   * @return Maximum number of supersteps
+   */
+  public int getMaxNumberOfSupersteps() {
+    return getInt(MAX_NUMBER_OF_SUPERSTEPS, MAX_NUMBER_OF_SUPERSTEPS_DEFAULT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index fe5278b..42f8abc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -684,5 +684,14 @@ public interface GiraphConstants {
   String DNS_INTERFACE = "giraph.dns.interface";
   /** Server for hostname resolution */
   String DNS_NAMESERVER = "giraph.dns.nameserver";
+
+  /**
+   * The application will halt after this many supersteps is completed.  For
+   * instance, if it is set to 3, the application will run at most 0, 1,
+   * and 2 supersteps and then go into the shutdown superstep.
+   */
+  String MAX_NUMBER_OF_SUPERSTEPS = "giraph.maxNumberOfSupersteps";
+  /** By default, the number of supersteps is not limited */
+  int MAX_NUMBER_OF_SUPERSTEPS_DEFAULT = -1;
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 9188a23..6c979d6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -135,6 +135,8 @@ public class BspServiceMaster<I extends WritableComparable,
   private final int maxWorkers;
   /** Min number of workers */
   private final int minWorkers;
+  /** Max number of supersteps */
+  private final int maxNumberOfSupersteps;
   /** Min % responded workers */
   private final float minPercentResponded;
   /** Msecs to wait for an event */
@@ -196,8 +198,9 @@ public class BspServiceMaster<I extends WritableComparable,
 
     ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration();
 
-    maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, -1);
-    minWorkers = conf.getInt(GiraphConstants.MIN_WORKERS, -1);
+    maxWorkers = conf.getMaxWorkers();
+    minWorkers = conf.getMinWorkers();
+    maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps();
     minPercentResponded = conf.getFloat(
         GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f);
     eventWaitMsecs = conf.getEventWaitMsecs();
@@ -1517,6 +1520,18 @@ public class BspServiceMaster<I extends WritableComparable,
       globalStats.setHaltComputation(true);
     }
 
+    // If we have completed the maximum number of supersteps, stop
+    // the computation
+    if (maxNumberOfSupersteps !=
+        GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS_DEFAULT &&
+        (getSuperstep() == maxNumberOfSupersteps - 1)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps +
+            " supersteps (max specified by the user), halting");
+      }
+      globalStats.setHaltComputation(true);
+    }
+
     // Let everyone know the aggregated application state through the
     // superstep finishing znode.
     String superstepFinishedNode =

http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java b/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java
new file mode 100644
index 0000000..d7ac4e8
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.io.IOException;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.counters.GiraphHadoopCounter;
+import org.apache.giraph.counters.GiraphStats;
+import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
+import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for testing max superstep feature of Giraph
+ */
+public class TestMaxSuperstep extends BspCase {
+  public TestMaxSuperstep() {
+      super(TestMaxSuperstep.class.getName());
+  }
+
+  /**
+   * Simple test vertex class that will run forever (voteToHalt is never
+   * called).
+   */
+  public static class InfiniteLoopVertex extends Vertex<LongWritable,
+      DoubleWritable, FloatWritable, DoubleWritable> {
+    @Override
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
+      // Do nothing, run forever!
+    }
+  }
+
+  /**
+   * Run a job that tests that this job completes in the desired number of
+   * supersteps
+   *
+   * @throws java.io.IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testMaxSuperstep()
+          throws IOException, InterruptedException, ClassNotFoundException {
+    GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+        classes = new GiraphClasses();
+    classes.setVertexClass(InfiniteLoopVertex.class);
+    classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
+    classes.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+    GiraphJob job = prepareJob(getCallingMethodName(), classes,
+        getTempPath(getCallingMethodName()));
+    job.getConfiguration().setMaxNumberOfSupersteps(3);
+    assertTrue(job.run(true));
+    if (!runningInDistributedMode()) {
+      GiraphHadoopCounter superstepCounter =
+          GiraphStats.getInstance().getSuperstepCounter();
+      assertEquals(superstepCounter.getValue(), 3L);
+    }
+  }
+}