You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/24 17:10:28 UTC

svn commit: r1304840 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/

Author: tjungblut
Date: Sat Mar 24 16:10:28 2012
New Revision: 1304840

URL: http://svn.apache.org/viewvc?rev=1304840&view=rev
Log:
[HAMA-503]: chainable computations

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java   (with props)
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java   (with props)
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1304840&r1=1304839&r2=1304840&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sat Mar 24 16:10:28 2012
@@ -4,6 +4,7 @@ Release 0.5 - Unreleased
 
   NEW FEATURES
 
+   HAMA-503: Chainable computations for fault tolerance (tjungblut)
    HAMA-517: Add documentation for Graph package (edwardyoon)
    HAMA-518: Add MinIntCombiner to SSSP example (edwardyoon)
    HAMA-367: Runtime Compression of BSP Messages to Improve the Performance (Apurv Verma via tjungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1304840&r1=1304839&r2=1304840&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sat Mar 24 16:10:28 2012
@@ -109,6 +109,19 @@ public class BSPJob extends BSPJobContex
     conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
   }
 
+  @SuppressWarnings("rawtypes")
+  public void setSupersteps(
+      Class<? extends Superstep>... classes) {
+    ensureState(JobState.DEFINE);
+
+    String clazzString = "";
+    for (Class<? extends Superstep> s : classes)
+      clazzString += s.getName() + ",";
+
+    conf.set("hama.supersteps.class", clazzString);
+    this.setBspClass(SuperstepBSP.class);
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public Class<? extends BSP> getBspClass() {
     return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java?rev=1304840&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java Sat Mar 24 16:10:28 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public abstract class Superstep<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE extends Writable> {
+
+  /**
+   * Setup this superstep, is called once before compute().
+   */
+  protected void setup(BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer) {
+
+  }
+
+  /**
+   * Cleanup this superstep, is called once after compute().
+   */
+  protected void cleanup(BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer) {
+
+  }
+
+  /**
+   * Main computation phase.
+   */
+  protected abstract void compute(
+      BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer)
+      throws IOException;
+
+  /**
+   * @return true to halt the computation
+   */
+  protected boolean haltComputation(
+      BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer) {
+    return false;
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java?rev=1304840&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java Sat Mar 24 16:10:28 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+import java.io.IOException;
+
+public class SuperstepBSP<K1, V1, K2, V2, M extends Writable> extends
+    BSP<K1, V1, K2, V2, M> {
+
+  private Superstep<K1, V1, K2, V2, M>[] supersteps;
+  private int startSuperstep;
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+      SyncException, InterruptedException {
+    // instantiate our superstep classes
+    Class<?>[] classes = peer.getConfiguration().getClasses(
+        "hama.supersteps.class", Superstep.class);
+
+    supersteps = new Superstep[classes.length];
+    for (int i = 0; i < classes.length; i++) {
+      Superstep<K1, V1, K2, V2, M> newInstance = (Superstep<K1, V1, K2, V2, M>) ReflectionUtils
+          .newInstance(classes[i], peer.getConfiguration());
+      newInstance.setup(peer);
+      supersteps[i] = newInstance;
+    }
+    startSuperstep = peer.getConfiguration().getInt("attempt.superstep", 0);
+  }
+
+  @Override
+  public void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+      SyncException, InterruptedException {
+    for (int index = startSuperstep; index < supersteps.length; index++) {
+      Superstep<K1, V1, K2, V2, M> superstep = supersteps[index];
+      superstep.compute(peer);
+      if (superstep.haltComputation(peer)) {
+        break;
+      }
+      peer.sync();
+      startSuperstep = 0;
+    }
+  }
+
+  @Override
+  public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
+    for (Superstep<K1, V1, K2, V2, M> superstep : supersteps) {
+      superstep.cleanup(peer);
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java?rev=1304840&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java Sat Mar 24 16:10:28 2012
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.FileOutputFormat;
+import org.apache.hama.bsp.NullInputFormat;
+import org.apache.hama.bsp.Superstep;
+import org.apache.hama.bsp.SuperstepBSP;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.messages.DoubleMessage;
+
+/**
+ * This PiEstimator uses the new chainable superstep API to be fault tolerant.
+ */
+public class SuperstepPiEstimator {
+
+  // shared variable across multiple supersteps
+  private static String masterTask;
+  private static final Path TMP_OUTPUT = new Path("/tmp/pi-"
+      + System.currentTimeMillis());
+
+  public static class PiEstimatorCalculator
+      extends
+      Superstep<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> {
+
+    private static final int iterations = 10000;
+
+    @Override
+    protected void setup(
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer) {
+      super.setup(peer);
+      // Choose first as a master
+      masterTask = peer.getPeerName(0);
+    }
+
+    @Override
+    protected void compute(
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer)
+        throws IOException {
+      int in = 0;
+      for (int i = 0; i < iterations; i++) {
+        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+        if ((Math.sqrt(x * x + y * y) < 1.0)) {
+          in++;
+        }
+      }
+
+      double data = 4.0 * (double) in / (double) iterations;
+      DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data);
+
+      peer.send(masterTask, estimate);
+    }
+  }
+
+  protected static class PiEstimatorAggregator
+      extends
+      Superstep<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> {
+
+    @Override
+    protected void compute(
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer)
+        throws IOException {
+      if (peer.getPeerName().equals(masterTask)) {
+        double pi = 0.0;
+        int numPeers = peer.getNumCurrentMessages();
+        DoubleMessage received;
+        while ((received = (DoubleMessage) peer.getCurrentMessage()) != null) {
+          pi += received.getData();
+        }
+
+        pi = pi / numPeers;
+        peer.write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
+      }
+    }
+
+    @Override
+    protected boolean haltComputation(
+        BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer) {
+      return true;
+    }
+
+  }
+
+  private static void printOutput(HamaConfiguration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] files = fs.listStatus(TMP_OUTPUT);
+    for (FileStatus file : files) {
+      if (file.getLen() > 0) {
+        FSDataInputStream in = fs.open(file.getPath());
+        IOUtils.copyBytes(in, System.out, conf, false);
+        in.close();
+        break;
+      }
+    }
+
+    fs.delete(TMP_OUTPUT, true);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void main(String[] args) throws IOException,
+      ClassNotFoundException, InterruptedException {
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+    BSPJob bsp = new BSPJob(conf, SuperstepBSP.class);
+    // Set the job name
+    bsp.setJobName("Fault Tolerant Pi Estimation Example");
+
+    // set our supersteps, they must be given in execution order
+    bsp.setSupersteps(SuperstepPiEstimator.PiEstimatorCalculator.class,
+        SuperstepPiEstimator.PiEstimatorAggregator.class);
+
+    bsp.setInputFormat(NullInputFormat.class);
+    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputValueClass(DoubleWritable.class);
+    bsp.setOutputFormat(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
+
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(true);
+
+    if (args.length > 0) {
+      bsp.setNumBspTask(Integer.parseInt(args[0]));
+    } else {
+      // Set to maximum
+      bsp.setNumBspTask(cluster.getMaxTasks());
+    }
+
+    long startTime = System.currentTimeMillis();
+    if (bsp.waitForCompletion(true)) {
+      printOutput(conf);
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
------------------------------------------------------------------------------
    svn:eol-style = native