You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/26 13:12:52 UTC
svn commit: r1096726 - in /incubator/hama/trunk: CHANGES.txt
src/java/org/apache/hama/bsp/BSPJob.java
src/java/org/apache/hama/bsp/BSPJobClient.java
src/java/org/apache/hama/bsp/JobStatus.java
src/java/org/apache/hama/bsp/LocalBSPRunner.java
Author: edwardyoon
Date: Tue Apr 26 11:12:51 2011
New Revision: 1096726
URL: http://svn.apache.org/viewvc?rev=1096726&view=rev
Log:
Add LocalBSPRunner
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Apr 26 11:12:51 2011
@@ -4,6 +4,8 @@ Release 0.3 - Unreleased
NEW FEATURES
+ HAMA-374: Add LocalBSPRunner (Thomas Jungblut via edwardyoon)
+
BUG FIXES
IMPROVEMENTS
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJob.java Tue Apr 26 11:12:51 2011
@@ -28,7 +28,8 @@ import org.apache.hama.HamaConfiguration
/**
* A BSP job configuration.
*
- * BSPJob is the primary interface for a user to describe a BSP job to the Hama BSP framework for execution.
+ * BSPJob is the primary interface for a user to describe a BSP job to the Hama
+ * BSP framework for execution.
*/
public class BSPJob extends BSPJobContext {
public static enum JobState {
@@ -57,7 +58,8 @@ public class BSPJob extends BSPJobContex
super(new Path(jobFile), jobID);
}
- public BSPJob(HamaConfiguration conf, Class<?> exampleClass) throws IOException {
+ public BSPJob(HamaConfiguration conf, Class<?> exampleClass)
+ throws IOException {
this(conf);
setJarByClass(exampleClass);
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Tue Apr 26 11:12:51 2011
@@ -197,10 +197,16 @@ public class BSPJobClient extends Config
}
public void init(Configuration conf) throws IOException {
- this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
- JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
- BSPMaster.getAddress(conf), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+ String masterAdress = conf.get("bsp.master.address");
+ if (masterAdress != null && !masterAdress.equals("local")) {
+ this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
+ JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
+ BSPMaster.getAddress(conf), conf,
+ NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+ } else {
+ LOG.debug("Using local BSP runner.");
+ this.jobSubmitClient = new LocalBSPRunner(conf);
+ }
}
/**
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1096726&r1=1096725&r2=1096726&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Tue Apr 26 11:12:51 2011
@@ -153,6 +153,10 @@ public class JobStatus implements Writab
return superstepCount;
}
+ public synchronized void setSuperstepCount(long superstepCount) {
+ this.superstepCount = superstepCount;
+ }
+
public synchronized void setStartTime(long startTime) {
this.startTime = startTime;
}
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1096726&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Apr 26 11:12:51 2011
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMaster.State;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A multithreaded local BSP runner that can be used for debugging BSP's. It
+ * uses the working directory "/user/hama/bsp/" and starts runners based on the
+ * number of the machines core.
+ *
+ */
+public class LocalBSPRunner implements JobSubmissionProtocol {
+
+ private static final String IDENTIFIER = "localrunner";
+ private static String WORKING_DIR = "/user/hama/bsp/";
+ protected static volatile ThreadPoolExecutor threadPool;
+ protected static final int threadPoolSize;
+ protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
+ protected static CyclicBarrier barrier;
+
+ static {
+ threadPoolSize = Runtime.getRuntime().availableProcessors();
+ barrier = new CyclicBarrier(threadPoolSize);
+ threadPool = (ThreadPoolExecutor) Executors
+ .newFixedThreadPool(threadPoolSize);
+ }
+
+ protected HashMap<String, BSPPeerProtocol> localGrooms = new HashMap<String, BSPPeerProtocol>();
+ protected String jobFile;
+ protected String jobName;
+
+ protected JobStatus currentJobStatus;
+
+ protected Configuration conf;
+ protected FileSystem fs;
+
+ {
+ for (int i = 0; i < threadPoolSize; i++) {
+ String name = IDENTIFIER + " " + i;
+ localGrooms.put(name, new LocalGroom(name));
+ }
+ }
+
+ public LocalBSPRunner(Configuration conf) throws IOException {
+ super();
+ this.conf = conf;
+ this.fs = FileSystem.get(conf);
+ String path = conf.get("bsp.local.dir");
+ if (path != null && !path.isEmpty())
+ WORKING_DIR = path;
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return 3;
+ }
+
+ @Override
+ public BSPJobID getNewJobId() throws IOException {
+ return new BSPJobID(IDENTIFIER, 1);
+ }
+
+ @Override
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+ this.jobFile = jobFile;
+ BSPJob job = new BSPJob(jobID, jobFile);
+ job.setNumBspTask(threadPoolSize);
+ this.jobName = job.getJobName();
+ currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"), 0,
+ JobStatus.RUNNING);
+ for (int i = 0; i < threadPoolSize; i++) {
+ String name = IDENTIFIER + " " + i;
+ BSPPeerProtocol localGroom = new LocalGroom(name);
+ localGrooms.put(name, localGroom);
+ futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils
+ .newInstance(job.getBspClass(), conf), localGroom)));
+ }
+ new Thread(new ThreadObserver(currentJobStatus)).start();
+ return currentJobStatus;
+ }
+
+ @Override
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+ Map<String, String> map = new HashMap<String, String>();
+ for (Entry<String, BSPPeerProtocol> entry : localGrooms.entrySet()) {
+ map.put(entry.getKey(), entry.getValue().getPeerName());
+ }
+ return new ClusterStatus(map, 0, 1, State.RUNNING);
+ }
+
+ @Override
+ public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+ return new JobProfile(System.getProperty("user.name"), jobid, jobFile,
+ jobName);
+ }
+
+ @Override
+ public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+ if (currentJobStatus == null) {
+ currentJobStatus = new JobStatus(jobid, System.getProperty("user.name"),
+ 0L, JobStatus.RUNNING);
+ }
+ return currentJobStatus;
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException {
+ return fs.getUri().toString();
+ }
+
+ @Override
+ public JobStatus[] jobsToComplete() throws IOException {
+ return null;
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException {
+ return null;
+ }
+
+ @Override
+ public String getSystemDir() {
+ return WORKING_DIR;
+ }
+
+ @Override
+ public void killJob(BSPJobID jobid) throws IOException {
+ return;
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException {
+ return false;
+ }
+
+ // this class will spawn a new thread and executes the BSP
+ class BSPRunner implements Callable<BSP> {
+
+ Configuration conf;
+ BSPJob job;
+ BSP bsp;
+ BSPPeerProtocol groom;
+
+ public BSPRunner(Configuration conf, BSPJob job, BSP bsp,
+ BSPPeerProtocol groom) {
+ super();
+ this.conf = conf;
+ this.job = job;
+ this.bsp = bsp;
+ this.groom = groom;
+ }
+
+ public void run() {
+ bsp.setConf(conf);
+ try {
+ bsp.bsp(groom);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public BSP call() throws Exception {
+ run();
+ return bsp;
+ }
+ }
+
+ // this thread observes the status of the runners.
+ class ThreadObserver implements Runnable {
+
+ JobStatus status;
+
+ public ThreadObserver(JobStatus currentJobStatus) {
+ this.status = currentJobStatus;
+ }
+
+ @Override
+ public void run() {
+ boolean success = true;
+ for (Future<BSP> future : futureList) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ success = false;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ success = false;
+ }
+ }
+ if (success) {
+ currentJobStatus.setState(JobStatus.State.SUCCEEDED);
+ currentJobStatus.setRunState(JobStatus.SUCCEEDED);
+ } else {
+ currentJobStatus.setState(JobStatus.State.FAILED);
+ currentJobStatus.setRunState(JobStatus.FAILED);
+ }
+ threadPool.shutdownNow();
+ }
+
+ }
+
+ class LocalGroom implements BSPPeerProtocol {
+ private static final String FIRST_THREAD_NAME = "pool-1-thread-1";
+ private long superStepCount = 0;
+ private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
+ // outgoing queue
+ private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<BSPMessage>>();
+ private final String peerName;
+
+ public LocalGroom(String peerName) {
+ super();
+ this.peerName = peerName;
+ }
+
+ @Override
+ public void send(String peerName, BSPMessage msg) throws IOException {
+ if (this.peerName.equals(peerName)) {
+ put(msg);
+ } else {
+ // put this into a outgoing queue
+ if (outgoingQueues.get(peerName) == null) {
+ outgoingQueues.put(peerName, new ConcurrentLinkedQueue<BSPMessage>());
+ }
+ outgoingQueues.get(peerName).add(msg);
+ }
+ }
+
+ @Override
+ public void put(BSPMessage msg) throws IOException {
+ localMessageQueue.add(msg);
+ }
+
+ @Override
+ public BSPMessage getCurrentMessage() throws IOException {
+ return localMessageQueue.poll();
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return localMessageQueue.size();
+ }
+
+ @Override
+ public void sync() throws IOException, KeeperException,
+ InterruptedException {
+ // wait until all threads reach this barrier
+ barrierSync();
+ // send the messages
+ for (Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : outgoingQueues
+ .entrySet()) {
+ String peerName = entry.getKey();
+ for (BSPMessage msg : entry.getValue())
+ localGrooms.get(peerName).put(msg);
+ }
+ // clear the local outgoing queue
+ outgoingQueues.clear();
+ // sync again to avoid data inconsistency
+ barrierSync();
+ incrementSuperSteps();
+ }
+
+ private void barrierSync() throws InterruptedException {
+ try {
+ barrier.await();
+ } catch (BrokenBarrierException e) {
+ throw new InterruptedException("Barrier has been broken!" + e);
+ }
+ }
+
+ private void incrementSuperSteps() {
+ // just let the first thread set the supersteps
+ if (Thread.currentThread().getName().equals(FIRST_THREAD_NAME)) {
+ currentJobStatus.setprogress(superStepCount++);
+ currentJobStatus.setSuperstepCount(currentJobStatus.progress());
+ }
+ }
+
+ @Override
+ public long getSuperstepCount() {
+ return superStepCount;
+ }
+
+ @Override
+ public String getPeerName() {
+ return peerName;
+ }
+
+ @Override
+ public String[] getAllPeerNames() {
+ return localGrooms.keySet().toArray(
+ new String[localGrooms.keySet().size()]);
+ }
+
+ @Override
+ public void clear() {
+ localMessageQueue.clear();
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return 3;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public Task getTask(TaskAttemptID taskid) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean ping(TaskAttemptID taskid) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void done(TaskAttemptID taskid, boolean shouldBePromoted)
+ throws IOException {
+
+ }
+
+ @Override
+ public void fsError(TaskAttemptID taskId, String message)
+ throws IOException {
+
+ }
+
+ @Override
+ public void put(BSPMessageBundle messages) throws IOException {
+ throw new UnsupportedOperationException(
+ "Messagebundle is not supported by local testing");
+ }
+
+ }
+
+}