You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/06/03 02:58:29 UTC
svn commit: r950840 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Thu Jun 3 00:58:29 2010
New Revision: 950840
URL: http://svn.apache.org/viewvc?rev=950840&view=rev
Log:
Design BSP program code interface, Add BSP interfaces
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
Removed:
incubator/hama/trunk/src/java/org/apache/hama/bsp/Work.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jun 3 00:58:29 2010
@@ -43,6 +43,7 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HAMA-257: Design BSP program code interface (edwardyoon)
HAMA-261: Remove meaningless unit tests (edwardyoon)
HAMA-253: Change the blog link url (edwardyoon)
HAMA-247: Discuss / Refactor HamaMaster and GroomServer (hyunsik)
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=950840&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Thu Jun 3 00:58:29 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class provides an abstract implementation of the BSP interface
+ */
+public abstract class BSP extends Thread implements BSPInterface {
+ private BSPPeer peer;
+
+ /**
+ * Constructor for abstract class
+ *
+ * @param conf
+ * @throws IOException
+ */
+ public BSP(Configuration conf) throws IOException {
+ this.peer = new BSPPeer(conf);
+ }
+
+ /**
+ * A thread's run method.
+ *
+ * The run method performs the
+ * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
+ */
+ public void run() {
+ try {
+ bsp(peer);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java Thu Jun 3 00:58:29 2010
@@ -1,5 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hama.bsp;
+import java.io.IOException;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Interface BSP defines the basic operations needed to implement the BSP
+ * algorithm.
+ */
public interface BSPInterface {
+ /**
+ * A user defined function for programming in the BSP style.
+ *
+ * Applications can use the {@link org.apache.hama.bsp.BSPPeer} to handle the
+ * communication and synchronization between processors.
+ *
+ * @param peer
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void bsp(BSPPeer peer) throws IOException, KeeperException,
+ InterruptedException;
+
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Thu Jun 3 00:58:29 2010
@@ -67,10 +67,16 @@ public class BSPJob extends BSPJobContex
conf.set("bsp.working.dir", dir.toString());
}
- public void setWorkClass(Class<? extends Work> cls)
+ /**
+ * Set the BSP algorithm class for the job.
+ *
+ * @param cls
+ * @throws IllegalStateException
+ */
+ public void setBspClass(Class<? extends BSP> cls)
throws IllegalStateException {
ensureState(JobState.DEFINE);
- conf.setClass(WORK_CLASS_ATTR, cls, Work.class);
+ conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
}
public void setJar(String jar) {
@@ -126,10 +132,6 @@ public class BSPJob extends BSPJobContex
conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
}
- public void setBSPCode(Class<? extends BSPInterface> class1) {
- // TODO Auto-generated method stub
- }
-
public void setUser(String user) {
conf.set("user.name", user);
}
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java Thu Jun 3 00:58:29 2010
@@ -13,8 +13,8 @@ public class BSPTestDriver {
*/
public static void main(String[] args) throws IOException, InterruptedException {
BSPJob job = new BSPJob(new HamaConfiguration());
- job.setJarByClass(Work.class);
- job.setWorkClass(Work.class);
+ job.setJarByClass(BSP.class);
+ job.setBspClass(BSP.class);
job.submit();
Thread.sleep(3000);
Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java Thu Jun 3 00:58:29 2010
@@ -1,105 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hama.bsp;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
public class UserInterface extends HamaCluster implements Watcher {
private HamaConfiguration conf;
- private String JOBNAME = "hama.test.bsp";
- private Path INPUTPATH = new Path("/tmp/input");
- private Path OUTPUTPATH = new Path("/tmp/output");
+ private int NUM_PEER = 10;
+ List<PiEstimator> list = new ArrayList<PiEstimator>(NUM_PEER);
- public UserInterface() {
- this.conf = getConf();
- }
-
- public void testScenario() throws InterruptedException, IOException {
- // BSP job configuration
- BSPJob bsp = new BSPJob(this.conf);
- // Set the job name
- bsp.setJobName(JOBNAME);
-
- // Set in/output path and formatter
-// bsp.setInputPath(conf, INPUTPATH);
-// bsp.setOutputPath(conf, OUTPUTPATH);
- bsp.setInputFormat(MyInputFormat.class);
- bsp.setOutputFormat(MyOutputFormat.class);
+ class PiEstimator extends BSP {
+ private static final int iterations = 10000;
- // Set the BSP code
- bsp.setBSPCode(MyBSP.class);
- bsp.submit();
-
- //*******************
- // assertion checking
- assertEquals(bsp.getJobName(), JOBNAME);
- //assertEquals(bsp.getInputPath(), INPUTPATH);
- //assertEquals(bsp.getOutputPath(), OUTPUTPATH);
- }
-
- class MyBSP implements BSPInterface {
- // TODO: implement some BSP example
- }
+ public PiEstimator(Configuration conf) throws IOException {
+ super(conf);
+ }
- class MyInputFormat extends InputFormat {
+ public void bsp(BSPPeer peer) throws IOException, KeeperException,
+ InterruptedException {
+ int in = 0, out = 0;
+ for (int i = 0; i < iterations; i++) {
+ double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+ if ((Math.sqrt(x * x + y * y) < 1.0)) {
+ in++;
+ } else {
+ out++;
+ }
+ }
+
+ byte[] tagName = Bytes.toBytes(getName().toString());
+ byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
+ BSPMessage estimate = new BSPMessage(tagName, myData);
+
+ peer.send(new InetSocketAddress("localhost", 30000), estimate);
+ peer.sync();
+
+ double pi = 0.0;
+ BSPMessage received;
+ while ((received = peer.getCurrentMessage()) != null) {
+ pi = (pi + Bytes.toDouble(received.getData())) / 2;
+ }
- @Override
- public RecordReader createRecordReader(InputSplit arg0,
- TaskAttemptContext arg1) throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- return null;
+ if (pi != 0.0)
+ System.out.println(pi);
}
+ }
- @Override
- public List getSplits(JobContext arg0) throws IOException,
- InterruptedException {
- // TODO Auto-generated method stub
- return null;
+ public void setUp() throws Exception {
+ super.setUp();
+ this.conf = getConf();
+ ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+ Stat s = null;
+ if (zk != null) {
+ try {
+ s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+ } catch (Exception e) {
+ LOG.error(s);
+ }
+
+ if (s == null) {
+ try {
+ zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
}
- // TODO: implement some input Formatter
}
-
- class MyOutputFormat extends OutputFormat {
- @Override
- public void checkOutputSpecs(JobContext arg0) throws IOException,
- InterruptedException {
- // TODO Auto-generated method stub
-
+ public void testBSPMain() throws InterruptedException, IOException {
+ PiEstimator thread;
+ for (int i = 0; i < NUM_PEER; i++) {
+ conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+ conf.set(Constants.PEER_HOST, "localhost");
+ conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+ conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+ thread = new PiEstimator(conf);
+ list.add(thread);
}
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- return null;
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).start();
}
- @Override
- public RecordWriter getRecordWriter(TaskAttemptContext arg0)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- return null;
+ for (int i = 0; i < NUM_PEER; i++) {
+ list.get(i).join();
}
- // TODO: implement some input Formatter
+
+ /*
+ // BSP job configuration
+ BSPJob bsp = new BSPJob(this.conf);
+ // Set the job name
+ bsp.setJobName("bsp test job");
+ bsp.setBspClass(PiEstimator.class);
+ bsp.submit();
+ */
}
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
-
+
}
}