You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/26 13:12:52 UTC

svn commit: r1096726 - in /incubator/hama/trunk: CHANGES.txt src/java/org/apache/hama/bsp/BSPJob.java src/java/org/apache/hama/bsp/BSPJobClient.java src/java/org/apache/hama/bsp/JobStatus.java src/java/org/apache/hama/bsp/LocalBSPRunner.java

Author: edwardyoon
Date: Tue Apr 26 11:12:51 2011
New Revision: 1096726

URL: http://svn.apache.org/viewvc?rev=1096726&view=rev
Log:
Add LocalBSPRunner

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Apr 26 11:12:51 2011
@@ -4,6 +4,8 @@ Release 0.3 - Unreleased
 
   NEW FEATURES
 
+    HAMA-374: Add LocalBSPRunner (Thomas Jungblut via edwardyoon)
+
   BUG FIXES
 
   IMPROVEMENTS

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Tue Apr 26 11:12:51 2011
@@ -28,7 +28,8 @@ import org.apache.hama.HamaConfiguration
 /**
  * A BSP job configuration.
  * 
- * BSPJob is the primary interface for a user to describe a BSP job to the Hama BSP framework for execution.
+ * BSPJob is the primary interface for a user to describe a BSP job to the Hama
+ * BSP framework for execution.
  */
 public class BSPJob extends BSPJobContext {
   public static enum JobState {
@@ -57,7 +58,8 @@ public class BSPJob extends BSPJobContex
     super(new Path(jobFile), jobID);
   }
 
-  public BSPJob(HamaConfiguration conf, Class<?> exampleClass) throws IOException {
+  public BSPJob(HamaConfiguration conf, Class<?> exampleClass)
+      throws IOException {
     this(conf);
     setJarByClass(exampleClass);
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Tue Apr 26 11:12:51 2011
@@ -197,10 +197,16 @@ public class BSPJobClient extends Config
   }
 
   public void init(Configuration conf) throws IOException {
-    this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
-        JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
-        BSPMaster.getAddress(conf), conf,
-        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+    String masterAdress = conf.get("bsp.master.address");
+    if (masterAdress != null && !masterAdress.equals("local")) {
+      this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
+          JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
+          BSPMaster.getAddress(conf), conf,
+          NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+    } else {
+      LOG.debug("Using local BSP runner.");
+      this.jobSubmitClient = new LocalBSPRunner(conf);
+    }
   }
 
   /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Tue Apr 26 11:12:51 2011
@@ -153,6 +153,10 @@ public class JobStatus implements Writab
     return superstepCount;
   }
   
+  public synchronized void setSuperstepCount(long superstepCount) {
+    this.superstepCount = superstepCount;
+  }
+  
   public synchronized void setStartTime(long startTime) {
     this.startTime = startTime;
   }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1096726&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Apr 26 11:12:51 2011
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMaster.State;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A multithreaded local BSP runner that can be used for debugging BSP's. It
+ * uses the working directory "/user/hama/bsp/" and starts runners based on the
+ * number of the machines core.
+ * 
+ */
+public class LocalBSPRunner implements JobSubmissionProtocol {
+
+  private static final String IDENTIFIER = "localrunner";
+  private static String WORKING_DIR = "/user/hama/bsp/";
+  protected static volatile ThreadPoolExecutor threadPool;
+  protected static final int threadPoolSize;
+  protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
+  protected static CyclicBarrier barrier;
+
+  static {
+    threadPoolSize = Runtime.getRuntime().availableProcessors();
+    barrier = new CyclicBarrier(threadPoolSize);
+    threadPool = (ThreadPoolExecutor) Executors
+        .newFixedThreadPool(threadPoolSize);
+  }
+
+  protected HashMap<String, BSPPeerProtocol> localGrooms = new HashMap<String, BSPPeerProtocol>();
+  protected String jobFile;
+  protected String jobName;
+
+  protected JobStatus currentJobStatus;
+
+  protected Configuration conf;
+  protected FileSystem fs;
+
+  {
+    for (int i = 0; i < threadPoolSize; i++) {
+      String name = IDENTIFIER + " " + i;
+      localGrooms.put(name, new LocalGroom(name));
+    }
+  }
+
+  public LocalBSPRunner(Configuration conf) throws IOException {
+    super();
+    this.conf = conf;
+    this.fs = FileSystem.get(conf);
+    String path = conf.get("bsp.local.dir");
+    if (path != null && !path.isEmpty())
+      WORKING_DIR = path;
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return 3;
+  }
+
+  @Override
+  public BSPJobID getNewJobId() throws IOException {
+    return new BSPJobID(IDENTIFIER, 1);
+  }
+
+  @Override
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+    this.jobFile = jobFile;
+    BSPJob job = new BSPJob(jobID, jobFile);
+    job.setNumBspTask(threadPoolSize);
+    this.jobName = job.getJobName();
+    currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"), 0,
+        JobStatus.RUNNING);
+    for (int i = 0; i < threadPoolSize; i++) {
+      String name = IDENTIFIER + " " + i;
+      BSPPeerProtocol localGroom = new LocalGroom(name);
+      localGrooms.put(name, localGroom);
+      futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils
+          .newInstance(job.getBspClass(), conf), localGroom)));
+    }
+    new Thread(new ThreadObserver(currentJobStatus)).start();
+    return currentJobStatus;
+  }
+
+  @Override
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+    Map<String, String> map = new HashMap<String, String>();
+    for (Entry<String, BSPPeerProtocol> entry : localGrooms.entrySet()) {
+      map.put(entry.getKey(), entry.getValue().getPeerName());
+    }
+    return new ClusterStatus(map, 0, 1, State.RUNNING);
+  }
+
+  @Override
+  public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+    return new JobProfile(System.getProperty("user.name"), jobid, jobFile,
+        jobName);
+  }
+
+  @Override
+  public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+    if (currentJobStatus == null) {
+      currentJobStatus = new JobStatus(jobid, System.getProperty("user.name"),
+          0L, JobStatus.RUNNING);
+    }
+    return currentJobStatus;
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException {
+    return fs.getUri().toString();
+  }
+
+  @Override
+  public JobStatus[] jobsToComplete() throws IOException {
+    return null;
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException {
+    return null;
+  }
+
+  @Override
+  public String getSystemDir() {
+    return WORKING_DIR;
+  }
+
+  @Override
+  public void killJob(BSPJobID jobid) throws IOException {
+    return;
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException {
+    return false;
+  }
+
+  // this class will spawn a new thread and executes the BSP
+  class BSPRunner implements Callable<BSP> {
+
+    Configuration conf;
+    BSPJob job;
+    BSP bsp;
+    BSPPeerProtocol groom;
+
+    public BSPRunner(Configuration conf, BSPJob job, BSP bsp,
+        BSPPeerProtocol groom) {
+      super();
+      this.conf = conf;
+      this.job = job;
+      this.bsp = bsp;
+      this.groom = groom;
+    }
+
+    public void run() {
+      bsp.setConf(conf);
+      try {
+        bsp.bsp(groom);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public BSP call() throws Exception {
+      run();
+      return bsp;
+    }
+  }
+
+  // this thread observes the status of the runners.
+  class ThreadObserver implements Runnable {
+
+    JobStatus status;
+
+    public ThreadObserver(JobStatus currentJobStatus) {
+      this.status = currentJobStatus;
+    }
+
+    @Override
+    public void run() {
+      boolean success = true;
+      for (Future<BSP> future : futureList) {
+        try {
+          future.get();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          success = false;
+        } catch (ExecutionException e) {
+          e.printStackTrace();
+          success = false;
+        }
+      }
+      if (success) {
+        currentJobStatus.setState(JobStatus.State.SUCCEEDED);
+        currentJobStatus.setRunState(JobStatus.SUCCEEDED);
+      } else {
+        currentJobStatus.setState(JobStatus.State.FAILED);
+        currentJobStatus.setRunState(JobStatus.FAILED);
+      }
+      threadPool.shutdownNow();
+    }
+
+  }
+
+  class LocalGroom implements BSPPeerProtocol {
+    private static final String FIRST_THREAD_NAME = "pool-1-thread-1";
+    private long superStepCount = 0;
+    private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
+    // outgoing queue
+    private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<BSPMessage>>();
+    private final String peerName;
+
+    public LocalGroom(String peerName) {
+      super();
+      this.peerName = peerName;
+    }
+
+    @Override
+    public void send(String peerName, BSPMessage msg) throws IOException {
+      if (this.peerName.equals(peerName)) {
+        put(msg);
+      } else {
+        // put this into a outgoing queue
+        if (outgoingQueues.get(peerName) == null) {
+          outgoingQueues.put(peerName, new ConcurrentLinkedQueue<BSPMessage>());
+        }
+        outgoingQueues.get(peerName).add(msg);
+      }
+    }
+
+    @Override
+    public void put(BSPMessage msg) throws IOException {
+      localMessageQueue.add(msg);
+    }
+
+    @Override
+    public BSPMessage getCurrentMessage() throws IOException {
+      return localMessageQueue.poll();
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return localMessageQueue.size();
+    }
+
+    @Override
+    public void sync() throws IOException, KeeperException,
+        InterruptedException {
+      // wait until all threads reach this barrier
+      barrierSync();
+      // send the messages
+      for (Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : outgoingQueues
+          .entrySet()) {
+        String peerName = entry.getKey();
+        for (BSPMessage msg : entry.getValue())
+          localGrooms.get(peerName).put(msg);
+      }
+      // clear the local outgoing queue
+      outgoingQueues.clear();
+      // sync again to avoid data inconsistency
+      barrierSync();
+      incrementSuperSteps();
+    }
+
+    private void barrierSync() throws InterruptedException {
+      try {
+        barrier.await();
+      } catch (BrokenBarrierException e) {
+        throw new InterruptedException("Barrier has been broken!" + e);
+      }
+    }
+
+    private void incrementSuperSteps() {
+      // just let the first thread set the supersteps
+      if (Thread.currentThread().getName().equals(FIRST_THREAD_NAME)) {
+        currentJobStatus.setprogress(superStepCount++);
+        currentJobStatus.setSuperstepCount(currentJobStatus.progress());
+      }
+    }
+
+    @Override
+    public long getSuperstepCount() {
+      return superStepCount;
+    }
+
+    @Override
+    public String getPeerName() {
+      return peerName;
+    }
+
+    @Override
+    public String[] getAllPeerNames() {
+      return localGrooms.keySet().toArray(
+          new String[localGrooms.keySet().size()]);
+    }
+
+    @Override
+    public void clear() {
+      localMessageQueue.clear();
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return 3;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Task getTask(TaskAttemptID taskid) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean ping(TaskAttemptID taskid) throws IOException {
+      return true;
+    }
+
+    @Override
+    public void done(TaskAttemptID taskid, boolean shouldBePromoted)
+        throws IOException {
+
+    }
+
+    @Override
+    public void fsError(TaskAttemptID taskId, String message)
+        throws IOException {
+
+    }
+
+    @Override
+    public void put(BSPMessageBundle messages) throws IOException {
+      throw new UnsupportedOperationException(
+          "Messagebundle is not supported by local testing");
+    }
+
+  }
+
+}