You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/24 17:10:28 UTC
svn commit: r1304840 - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
examples/src/main/java/org/apache/hama/examples/
Author: tjungblut
Date: Sat Mar 24 16:10:28 2012
New Revision: 1304840
URL: http://svn.apache.org/viewvc?rev=1304840&view=rev
Log:
[HAMA-503]: chainable computations
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java (with props)
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java (with props)
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java (with props)
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1304840&r1=1304839&r2=1304840&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sat Mar 24 16:10:28 2012
@@ -4,6 +4,7 @@ Release 0.5 - Unreleased
NEW FEATURES
+ HAMA-503: Chainable computations for fault tolerance (tjungblut)
HAMA-517: Add documentation for Graph package (edwardyoon)
HAMA-518: Add MinIntCombiner to SSSP example (edwardyoon)
HAMA-367: Runtime Compression of BSP Messages to Improve the Performance (Apurv Verma via tjungblut)
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1304840&r1=1304839&r2=1304840&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Sat Mar 24 16:10:28 2012
@@ -109,6 +109,19 @@ public class BSPJob extends BSPJobContex
conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
}
+ @SuppressWarnings("rawtypes")
+ public void setSupersteps(
+ Class<? extends Superstep>... classes) {
+ ensureState(JobState.DEFINE);
+
+ String clazzString = "";
+ for (Class<? extends Superstep> s : classes)
+ clazzString += s.getName() + ",";
+
+ conf.set("hama.supersteps.class", clazzString);
+ this.setBspClass(SuperstepBSP.class);
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
public Class<? extends BSP> getBspClass() {
return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java?rev=1304840&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java Sat Mar 24 16:10:28 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public abstract class Superstep<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE extends Writable> {
+
+ /**
+ * Setup this superstep, is called once before compute().
+ */
+ protected void setup(BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer) {
+
+ }
+
+ /**
+ * Cleanup this superstep, is called once after compute().
+ */
+ protected void cleanup(BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer) {
+
+ }
+
+ /**
+ * Main computation phase.
+ */
+ protected abstract void compute(
+ BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer)
+ throws IOException;
+
+ /**
+ * @return true to halt the computation
+ */
+ protected boolean haltComputation(
+ BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, MESSAGE> peer) {
+ return false;
+ }
+
+}
Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Superstep.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java?rev=1304840&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java Sat Mar 24 16:10:28 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+import java.io.IOException;
+
+public class SuperstepBSP<K1, V1, K2, V2, M extends Writable> extends
+ BSP<K1, V1, K2, V2, M> {
+
+ private Superstep<K1, V1, K2, V2, M>[] supersteps;
+ private int startSuperstep;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+ SyncException, InterruptedException {
+ // instantiate our superstep classes
+ Class<?>[] classes = peer.getConfiguration().getClasses(
+ "hama.supersteps.class", Superstep.class);
+
+ supersteps = new Superstep[classes.length];
+ for (int i = 0; i < classes.length; i++) {
+ Superstep<K1, V1, K2, V2, M> newInstance = (Superstep<K1, V1, K2, V2, M>) ReflectionUtils
+ .newInstance(classes[i], peer.getConfiguration());
+ newInstance.setup(peer);
+ supersteps[i] = newInstance;
+ }
+ startSuperstep = peer.getConfiguration().getInt("attempt.superstep", 0);
+ }
+
+ @Override
+ public void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
+ SyncException, InterruptedException {
+ for (int index = startSuperstep; index < supersteps.length; index++) {
+ Superstep<K1, V1, K2, V2, M> superstep = supersteps[index];
+ superstep.compute(peer);
+ if (superstep.haltComputation(peer)) {
+ break;
+ }
+ peer.sync();
+ startSuperstep = 0;
+ }
+ }
+
+ @Override
+ public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
+ for (Superstep<K1, V1, K2, V2, M> superstep : supersteps) {
+ superstep.cleanup(peer);
+ }
+ }
+
+}
Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SuperstepBSP.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java?rev=1304840&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java Sat Mar 24 16:10:28 2012
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.FileOutputFormat;
+import org.apache.hama.bsp.NullInputFormat;
+import org.apache.hama.bsp.Superstep;
+import org.apache.hama.bsp.SuperstepBSP;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.messages.DoubleMessage;
+
+/**
+ * This PiEstimator uses the new chainable superstep API to be fault tolerant.
+ */
+public class SuperstepPiEstimator {
+
+ // shared variable across multiple supersteps
+ private static String masterTask;
+ private static final Path TMP_OUTPUT = new Path("/tmp/pi-"
+ + System.currentTimeMillis());
+
+ public static class PiEstimatorCalculator
+ extends
+ Superstep<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> {
+
+ private static final int iterations = 10000;
+
+ @Override
+ protected void setup(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer) {
+ super.setup(peer);
+ // Choose first as a master
+ masterTask = peer.getPeerName(0);
+ }
+
+ @Override
+ protected void compute(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer)
+ throws IOException {
+ int in = 0;
+ for (int i = 0; i < iterations; i++) {
+ double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+ if ((Math.sqrt(x * x + y * y) < 1.0)) {
+ in++;
+ }
+ }
+
+ double data = 4.0 * (double) in / (double) iterations;
+ DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data);
+
+ peer.send(masterTask, estimate);
+ }
+ }
+
+ protected static class PiEstimatorAggregator
+ extends
+ Superstep<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> {
+
+ @Override
+ protected void compute(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer)
+ throws IOException {
+ if (peer.getPeerName().equals(masterTask)) {
+ double pi = 0.0;
+ int numPeers = peer.getNumCurrentMessages();
+ DoubleMessage received;
+ while ((received = (DoubleMessage) peer.getCurrentMessage()) != null) {
+ pi += received.getData();
+ }
+
+ pi = pi / numPeers;
+ peer.write(new Text("Estimated value of PI is"), new DoubleWritable(pi));
+ }
+ }
+
+ @Override
+ protected boolean haltComputation(
+ BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleMessage> peer) {
+ return true;
+ }
+
+ }
+
+ private static void printOutput(HamaConfiguration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] files = fs.listStatus(TMP_OUTPUT);
+ for (FileStatus file : files) {
+ if (file.getLen() > 0) {
+ FSDataInputStream in = fs.open(file.getPath());
+ IOUtils.copyBytes(in, System.out, conf, false);
+ in.close();
+ break;
+ }
+ }
+
+ fs.delete(TMP_OUTPUT, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void main(String[] args) throws IOException,
+ ClassNotFoundException, InterruptedException {
+ // BSP job configuration
+ HamaConfiguration conf = new HamaConfiguration();
+ BSPJob bsp = new BSPJob(conf, SuperstepBSP.class);
+ // Set the job name
+ bsp.setJobName("Fault Tolerant Pi Estimation Example");
+
+ // set our supersteps, they must be given in execution order
+ bsp.setSupersteps(SuperstepPiEstimator.PiEstimatorCalculator.class,
+ SuperstepPiEstimator.PiEstimatorAggregator.class);
+
+ bsp.setInputFormat(NullInputFormat.class);
+ bsp.setOutputKeyClass(Text.class);
+ bsp.setOutputValueClass(DoubleWritable.class);
+ bsp.setOutputFormat(TextOutputFormat.class);
+ FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);
+
+ BSPJobClient jobClient = new BSPJobClient(conf);
+ ClusterStatus cluster = jobClient.getClusterStatus(true);
+
+ if (args.length > 0) {
+ bsp.setNumBspTask(Integer.parseInt(args[0]));
+ } else {
+ // Set to maximum
+ bsp.setNumBspTask(cluster.getMaxTasks());
+ }
+
+ long startTime = System.currentTimeMillis();
+ if (bsp.waitForCompletion(true)) {
+ printOutput(conf);
+ System.out.println("Job Finished in "
+ + (double) (System.currentTimeMillis() - startTime) / 1000.0
+ + " seconds");
+ }
+ }
+
+}
Propchange: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
------------------------------------------------------------------------------
svn:eol-style = native