You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2010/06/03 02:58:29 UTC

svn commit: r950840 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Thu Jun  3 00:58:29 2010
New Revision: 950840

URL: http://svn.apache.org/viewvc?rev=950840&view=rev
Log:
Design BSP program code interface, Add BSP interfaces

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Work.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jun  3 00:58:29 2010
@@ -43,6 +43,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    HAMA-257: Design BSP program code interface (edwardyoon)
     HAMA-261: Remove meaningless unit tests (edwardyoon)
     HAMA-253: Change the blog link url (edwardyoon)
     HAMA-247: Discuss / Refactor HamaMaster and GroomServer (hyunsik)

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=950840&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Thu Jun  3 00:58:29 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This class provides an abstract implementation of the BSP interface
+ */
+public abstract class BSP extends Thread implements BSPInterface {
+  private BSPPeer peer;
+
+  /**
+   * Constructor for abstract class
+   * 
+   * @param conf
+   * @throws IOException
+   */
+  public BSP(Configuration conf) throws IOException {
+    this.peer = new BSPPeer(conf);
+  }
+
+  /**
+   * A thread's run method.
+   * 
+   * The run method performs the
+   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
+   */
+  public void run() {
+    try {
+      bsp(peer);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (KeeperException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPInterface.java Thu Jun  3 00:58:29 2010
@@ -1,5 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
+import java.io.IOException;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Interface BSP defines the basic operations needed to implement the BSP
+ * algorithm.
+ */
 public interface BSPInterface {
 
+  /**
+   * A user defined function for programming in the BSP style.
+   * 
+   * Applications can use the {@link org.apache.hama.bsp.BSPPeer} to handle the
+   * communication and synchronization between processors.
+   * 
+   * @param peer
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void bsp(BSPPeer peer) throws IOException, KeeperException,
+      InterruptedException;
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Thu Jun  3 00:58:29 2010
@@ -67,10 +67,16 @@ public class BSPJob extends BSPJobContex
     conf.set("bsp.working.dir", dir.toString());
   }
 
-  public void setWorkClass(Class<? extends Work> cls)
+  /**
+   * Set the BSP algorithm class for the job.
+   * 
+   * @param cls
+   * @throws IllegalStateException
+   */
+  public void setBspClass(Class<? extends BSP> cls)
       throws IllegalStateException {
     ensureState(JobState.DEFINE);
-    conf.setClass(WORK_CLASS_ATTR, cls, Work.class);
+    conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
   }
 
   public void setJar(String jar) {
@@ -126,10 +132,6 @@ public class BSPJob extends BSPJobContex
     conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);    
   }
 
-  public void setBSPCode(Class<? extends BSPInterface> class1) {
-    // TODO Auto-generated method stub    
-  }
-
   public void setUser(String user) {
     conf.set("user.name", user);
   }

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java Thu Jun  3 00:58:29 2010
@@ -13,8 +13,8 @@ public class BSPTestDriver {
    */
   public static void main(String[] args) throws IOException, InterruptedException {
     BSPJob job = new BSPJob(new HamaConfiguration());
-    job.setJarByClass(Work.class);
-    job.setWorkClass(Work.class);
+    job.setJarByClass(BSP.class);
+    job.setBspClass(BSP.class);
     job.submit();
     Thread.sleep(3000);
 

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java?rev=950840&r1=950839&r2=950840&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/UserInterface.java Thu Jun  3 00:58:29 2010
@@ -1,105 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
 
 public class UserInterface extends HamaCluster implements Watcher {
   private HamaConfiguration conf;
-  private String JOBNAME = "hama.test.bsp";
-  private Path INPUTPATH = new Path("/tmp/input");
-  private Path OUTPUTPATH = new Path("/tmp/output");
+  private int NUM_PEER = 10;
+  List<PiEstimator> list = new ArrayList<PiEstimator>(NUM_PEER);
   
-  public UserInterface() {
-    this.conf = getConf();
-  }
-
-  public void testScenario() throws InterruptedException, IOException {
-    // BSP job configuration
-    BSPJob bsp = new BSPJob(this.conf);
-    // Set the job name
-    bsp.setJobName(JOBNAME);
-
-    // Set in/output path and formatter
-//    bsp.setInputPath(conf, INPUTPATH);
-//    bsp.setOutputPath(conf, OUTPUTPATH);
-    bsp.setInputFormat(MyInputFormat.class);
-    bsp.setOutputFormat(MyOutputFormat.class);
+  class PiEstimator extends BSP {
+    private static final int iterations = 10000;
 
-    // Set the BSP code
-    bsp.setBSPCode(MyBSP.class);
-    bsp.submit();
-
-    //*******************
-    // assertion checking
-    assertEquals(bsp.getJobName(), JOBNAME);
-    //assertEquals(bsp.getInputPath(), INPUTPATH);
-    //assertEquals(bsp.getOutputPath(), OUTPUTPATH);
-  }
-
-  class MyBSP implements BSPInterface {
-    // TODO: implement some BSP example
-  }
+    public PiEstimator(Configuration conf) throws IOException {
+      super(conf);
+    }
 
-  class MyInputFormat extends InputFormat {
+    public void bsp(BSPPeer peer) throws IOException, KeeperException,
+        InterruptedException {
+      int in = 0, out = 0;
+      for (int i = 0; i < iterations; i++) {
+        double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
+        if ((Math.sqrt(x * x + y * y) < 1.0)) {
+          in++;
+        } else {
+          out++;
+        }
+      }
+
+      byte[] tagName = Bytes.toBytes(getName().toString());
+      byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
+      BSPMessage estimate = new BSPMessage(tagName, myData);
+
+      peer.send(new InetSocketAddress("localhost", 30000), estimate);
+      peer.sync();
+
+      double pi = 0.0;
+      BSPMessage received;
+      while ((received = peer.getCurrentMessage()) != null) {
+        pi = (pi + Bytes.toDouble(received.getData())) / 2;
+      }
 
-    @Override
-    public RecordReader createRecordReader(InputSplit arg0,
-        TaskAttemptContext arg1) throws IOException, InterruptedException {
-      // TODO Auto-generated method stub
-      return null;
+      if (pi != 0.0)
+        System.out.println(pi);
     }
+  }
 
-    @Override
-    public List getSplits(JobContext arg0) throws IOException,
-        InterruptedException {
-      // TODO Auto-generated method stub
-      return null;
+  public void setUp() throws Exception {
+    super.setUp();
+    this.conf = getConf();
+    ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+    Stat s = null;
+    if (zk != null) {
+      try {
+        s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
+      } catch (Exception e) {
+        LOG.error(s);
+      }
+
+      if (s == null) {
+        try {
+          zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+              Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
     }
-    // TODO: implement some input Formatter
   }
-  
-  class MyOutputFormat extends OutputFormat {
 
-    @Override
-    public void checkOutputSpecs(JobContext arg0) throws IOException,
-        InterruptedException {
-      // TODO Auto-generated method stub
-      
+  public void testBSPMain() throws InterruptedException, IOException {
+    PiEstimator thread;
+    for (int i = 0; i < NUM_PEER; i++) {
+      conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+      conf.set(Constants.PEER_HOST, "localhost");
+      conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+      conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+      thread = new PiEstimator(conf);
+      list.add(thread);
     }
 
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
-        throws IOException, InterruptedException {
-      // TODO Auto-generated method stub
-      return null;
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).start();
     }
 
-    @Override
-    public RecordWriter getRecordWriter(TaskAttemptContext arg0)
-        throws IOException, InterruptedException {
-      // TODO Auto-generated method stub
-      return null;
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).join();
     }
-    // TODO: implement some input Formatter
+
+    /*
+    // BSP job configuration
+    BSPJob bsp = new BSPJob(this.conf);
+    // Set the job name
+    bsp.setJobName("bsp test job");
+    bsp.setBspClass(PiEstimator.class);
+    bsp.submit();
+    */
   }
 
   @Override
   public void process(WatchedEvent event) {
     // TODO Auto-generated method stub
-    
+
   }
 }